Files

224 lines
8.8 KiB
Python

"""
create_report.py
Streamlit report kontaktů z MySQL tabulky CTMS_contacts.
Spuštění: streamlit run create_report.py
"""
import json
from pathlib import Path
import mysql.connector
import pandas as pd
import pyperclip
import streamlit as st
# ── Konfigurace ────────────────────────────────────────────────────────────────
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "studie",
"charset": "utf8mb4",
}
TABLE = "CTMS_contacts"
STATE_FILE = Path(__file__).parent / "filter_state.json"
ACTIVE_SITES = {
"77242113UCO3001": {
"DD5-CZ10001", "DD5-CZ10003", "DD5-CZ10006", "DD5-CZ10009",
"DD5-CZ10010", "DD5-CZ10012", "DD5-CZ10013", "DD5-CZ10015",
"DD5-CZ10016", "DD5-CZ10020", "DD5-CZ10021", "DD5-CZ10022",
},
"42847922MDD3003": {
"S10-CZ10004", "S10-CZ10008", "S10-CZ10011", "S10-CZ10012",
},
}
DISPLAY_COLS = {
"site_id": "Site ID",
"institution_name": "Institution",
"pi_full_name": "PI",
"contact_title": "Title",
"last_name": "Last Name",
"first_name": "First Name",
"contact_role": "Role",
"primary_indicator": "Primary",
"phone": "Phone",
"phone_mobile": "Mobile",
"email": "Email",
"contact_start_date": "Start Date",
"contact_end_date": "End Date",
}
STATUS_OPTIONS = ["Aktivní", "Neaktivní", "Všechna"]
DEFAULT_STATUS = "Aktivní"
# ── Perzistence filtrů ─────────────────────────────────────────────────────────
def load_filter_state() -> dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except Exception:
pass
return {}
def save_filter_state():
state = {
"sel_status": st.session_state.get("sel_status", DEFAULT_STATUS),
"sel_proto": st.session_state.get("sel_proto", "Všechny"),
"sel_role": st.session_state.get("sel_role", []),
"sel_site": st.session_state.get("sel_site", []),
}
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
# ── Data ───────────────────────────────────────────────────────────────────────
@st.cache_data(ttl=300)
def load_data() -> pd.DataFrame:
cols = ", ".join(DISPLAY_COLS.keys())
sql = (
f"SELECT protocol_id, file_date, {cols} "
f"FROM {TABLE} "
f"ORDER BY protocol_id, site_id, contact_role, last_name, first_name"
)
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows)
# ── Aplikace ───────────────────────────────────────────────────────────────────
st.set_page_config(page_title="CTMS Contacts", page_icon="🏥", layout="wide")
st.title("🏥 CTMS Contacts — Czechia")
try:
df = load_data()
except Exception as e:
st.error(f"Chyba připojení k MySQL: {e}")
st.stop()
protocols = ["Všechny"] + sorted(df["protocol_id"].unique().tolist())
# Načti uložený stav jednou za session
if "filters_initialized" not in st.session_state:
saved = load_filter_state()
st.session_state["sel_status"] = saved.get("sel_status", DEFAULT_STATUS) if saved.get("sel_status") in STATUS_OPTIONS else DEFAULT_STATUS
st.session_state["sel_proto"] = saved.get("sel_proto", "Všechny") if saved.get("sel_proto") in protocols else "Všechny"
st.session_state["sel_role"] = saved.get("sel_role", [])
st.session_state["sel_site"] = saved.get("sel_site", [])
st.session_state["filters_initialized"] = True
# Role a centra podle vybraného protokolu + aktivní/neaktivní
all_active = set().union(*ACTIVE_SITES.values())
df_opts = df.copy()
if st.session_state["sel_proto"] != "Všechny":
df_opts = df_opts[df_opts["protocol_id"] == st.session_state["sel_proto"]]
if st.session_state["sel_status"] == "Aktivní":
df_opts = df_opts[df_opts["site_id"].isin(all_active) & df_opts["contact_end_date"].isna()]
elif st.session_state["sel_status"] == "Neaktivní":
df_opts = df_opts[~df_opts["site_id"].isin(all_active)]
roles = sorted(df_opts["contact_role"].dropna().unique().tolist())
sites = sorted(df_opts["site_id"].dropna().unique().tolist())
# Pročisti neplatné výběry po změně protokolu
st.session_state["sel_role"] = [r for r in st.session_state["sel_role"] if r in roles]
st.session_state["sel_site"] = [s for s in st.session_state["sel_site"] if s in sites]
# ── Sidebar filtry ─────────────────────────────────────────────────────────────
with st.sidebar:
st.header("Filtry")
st.radio(
"Střediska", STATUS_OPTIONS, horizontal=True,
key="sel_status", on_change=save_filter_state,
)
st.selectbox(
"Protokol", protocols,
key="sel_proto", on_change=save_filter_state,
)
st.multiselect(
"Role", roles,
key="sel_role", on_change=save_filter_state,
)
st.multiselect(
"Site", sites,
key="sel_site", on_change=save_filter_state,
)
search = st.text_input("Hledat (jméno, email…)")
st.divider()
if st.button("🔄 Obnovit data"):
st.cache_data.clear()
st.rerun()
st.caption(f"Naposledy načteno: {pd.Timestamp.now().strftime('%H:%M:%S')}")
# ── Filtrování ─────────────────────────────────────────────────────────────────
filtered = df.copy()
if st.session_state["sel_proto"] != "Všechny":
filtered = filtered[filtered["protocol_id"] == st.session_state["sel_proto"]]
if st.session_state["sel_status"] == "Aktivní":
filtered = filtered[filtered["site_id"].isin(all_active) & filtered["contact_end_date"].isna()]
elif st.session_state["sel_status"] == "Neaktivní":
filtered = filtered[~filtered["site_id"].isin(all_active)]
if st.session_state["sel_role"]:
filtered = filtered[filtered["contact_role"].isin(st.session_state["sel_role"])]
if st.session_state["sel_site"]:
filtered = filtered[filtered["site_id"].isin(st.session_state["sel_site"])]
if search:
mask = filtered.apply(
lambda row: row.astype(str).str.contains(search, case=False, na=False).any(),
axis=1,
)
filtered = filtered[mask]
# ── Metriky ────────────────────────────────────────────────────────────────────
col1, col2, col3, col4 = st.columns(4)
col1.metric("Kontaktů celkem", len(filtered))
col2.metric("Protokolů", filtered["protocol_id"].nunique())
col3.metric("Středisek", filtered["site_id"].nunique())
col4.metric("Rolí", filtered["contact_role"].nunique())
st.divider()
# ── Tabulka ────────────────────────────────────────────────────────────────────
display = filtered[["protocol_id", "file_date"] + list(DISPLAY_COLS.keys())].copy()
display = display.rename(columns={"protocol_id": "Protocol", "file_date": "File Date", **DISPLAY_COLS})
st.dataframe(
display,
width="stretch",
hide_index=True,
column_config={
"Email": st.column_config.LinkColumn("Email", display_text=".*"),
"Start Date": st.column_config.DateColumn("Start Date", format="DD-MMM-YYYY"),
"End Date": st.column_config.DateColumn("End Date", format="DD-MMM-YYYY"),
},
)
st.caption(f"Zobrazeno {len(filtered)} z {len(df)} záznamů")
st.divider()
email_rows = filtered[["first_name", "last_name", "email"]].dropna(subset=["email"])
email_rows = email_rows[email_rows["email"].str.strip() != ""]
entries = [
f"{row.first_name} {row.last_name} <{row.email}>"
for row in email_rows.itertuples()
]
email_str = "; ".join(entries)
if st.button(f"📋 Kopírovat emaily do clipboardu ({len(entries)} adres)"):
if entries:
pyperclip.copy(email_str)
st.success(f"✅ Zkopírováno {len(entries)} adres — vlož přímo do pole Komu.")