This commit is contained in:
2026-05-12 15:41:55 +02:00
parent b3ba440269
commit 2054432757
12 changed files with 930 additions and 132 deletions
+117
View File
@@ -0,0 +1,117 @@
# PanoramaContacts — CLAUDE.md
## Účel adresáře
Import kontaktů středisek (site contacts) z exportů systému PANORAMA (CTMS) do MySQL a jejich zobrazení ve Streamlit web reportu.
Filtruje pouze záznamy pro **Czechia**. Aktuálně pokryté protokoly:
| Protocol ID | TA |
|---|---|
| `77242113UCO3001` | Immunology |
| `42847922MDD3003` | Neuroscience |
---
## Soubory
| Soubor | Účel |
|---|---|
| `import_CZ_contacts.py` | Import xlsx → MySQL |
| `webreport.py` | Streamlit web report |
| `run_webreport.py` | PyCharm launcher (`streamlit run webreport.py`) |
| `sql/create_CTMS_contacts.sql` | DDL tabulky `CTMS_contacts` |
| `SourceData/*.xlsx` | PANORAMA Dashboard exporty (zdrojová data) |
| `filter_state.json` | Automaticky ukládaný stav filtrů (generuje app) |
---
## MySQL
- **Host:** 192.168.1.76:3306 · **DB:** `studie` · **Tabulka:** `CTMS_contacts`
- **Sheet v xlsx:** `Site Contacts`, header na řádku 6 (0-based index 5)
### Klíčové sloupce tabulky
| Sloupec | Typ | Poznámka |
|---|---|---|
| `file_date` | DATE | Z `dcterms:created` v docProps/core.xml xlsx |
| `imported_at` | DATETIME | Auto timestamp importu |
| `protocol_id` | VARCHAR(20) | Identifikátor studie |
| `site_id` | VARCHAR(15) | Středisko (např. `DD5-CZ10006`) |
| `contact_role` | VARCHAR(50) | Role kontaktu (PI, Study Coordinator, …) |
| `contact_start_date` | DATE | Začátek platnosti kontaktu |
| `contact_end_date` | DATE | Konec platnosti — NULL = stále aktivní |
| `email` | VARCHAR(100) | Hlavní e-mail |
---
## import_CZ_contacts.py
- Zpracuje všechny `*.xlsx` v `SourceData/`
- Přeskočí soubory, jejichž `file_date` ≠ dnešní datum (UTC)
- Přepis: DELETE + INSERT podle `(file_date, protocol_id, country_name)`
- `clean_value()` převede NaN / NaT / Timestamp na typy přijatelné MySQL driverem
---
## webreport.py — Streamlit app
### Filtry (sidebar)
| Filtr | Widget | Logika options |
|---|---|---|
| **Střediska** | radio | Aktivní / Neaktivní / Všechna |
| **Protokol** | selectbox | Z celé DB |
| **Role** | multiselect | Filtrováno dle protokolu + aktivní/neaktivní |
| **Site** | multiselect | Filtrováno dle protokolu + aktivní/neaktivní |
| **Hledání** | text_input | Fulltext přes všechny sloupce řádku |
### Logika filtru Střediska
| Hodnota | Site podmínka | End Date podmínka |
|---|---|---|
| **Aktivní** | `site_id` v `ACTIVE_SITES` | `contact_end_date IS NULL` |
| **Neaktivní** | `site_id` NOT v `ACTIVE_SITES` | bez omezení |
| **Všechna** | bez omezení | bez omezení |
### Aktivní střediska (ACTIVE_SITES)
```python
"77242113UCO3001": {
"DD5-CZ10001", "DD5-CZ10003", "DD5-CZ10006", "DD5-CZ10009",
"DD5-CZ10010", "DD5-CZ10012", "DD5-CZ10013", "DD5-CZ10015",
"DD5-CZ10016", "DD5-CZ10020", "DD5-CZ10021", "DD5-CZ10022",
}
"42847922MDD3003": {
"S10-CZ10004", "S10-CZ10008", "S10-CZ10011", "S10-CZ10012",
}
```
### Perzistence filtrů
- Stav se ukládá do `filter_state.json` při každé změně filtru (`on_change=save_filter_state`)
- Načítá se jednou za session přes flag `filters_initialized` v `st.session_state`
- Při načítání se hodnoty validují vůči aktuálním options (ochrana před zastaralými daty)
### Clipboard tlačítko
- Knihovna `pyperclip` — kopíruje přímo do Windows clipboardu ze serverové strany
- Formát: `Jméno Příjmení <email@domain.cz>; …`
- Reaguje na aktuálně zobrazené (filtrované) záznamy
### Cache
- `@st.cache_data(ttl=300)` — data se drží 5 minut
- Tlačítko 🔄 Obnovit data volá `st.cache_data.clear()` + `st.rerun()`
---
## Závislosti (venv)
```
mysql-connector-python
pandas
openpyxl
streamlit
pyperclip
```
-132
View File
@@ -1,132 +0,0 @@
"""
create_report.py
Streamlit report kontaktů z MySQL tabulky CTMS_contacts.
Spuštění: streamlit run create_report.py
"""
from datetime import date
import mysql.connector
import pandas as pd
import streamlit as st
# ── Konfigurace ────────────────────────────────────────────────────────────────
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "studie",
"charset": "utf8mb4",
}
TABLE = "CTMS_contacts"
DISPLAY_COLS = {
"site_id": "Site ID",
"institution_name": "Institution",
"pi_full_name": "PI",
"contact_title": "Title",
"last_name": "Last Name",
"first_name": "First Name",
"contact_role": "Role",
"primary_indicator": "Primary",
"phone": "Phone",
"phone_mobile": "Mobile",
"email": "Email",
"contact_start_date": "Start Date",
"contact_end_date": "End Date",
}
# ── Data ───────────────────────────────────────────────────────────────────────
@st.cache_data(ttl=300)
def load_data() -> pd.DataFrame:
cols = ", ".join(DISPLAY_COLS.keys())
sql = (
f"SELECT protocol_id, file_date, {cols} "
f"FROM {TABLE} "
f"ORDER BY protocol_id, site_id, contact_role, last_name, first_name"
)
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows)
# ── Aplikace ───────────────────────────────────────────────────────────────────
st.set_page_config(page_title="CTMS Contacts", page_icon="🏥", layout="wide")
st.title("🏥 CTMS Contacts — Czechia")
try:
df = load_data()
except Exception as e:
st.error(f"Chyba připojení k MySQL: {e}")
st.stop()
# ── Sidebar filtry ─────────────────────────────────────────────────────────────
with st.sidebar:
st.header("Filtry")
protocols = ["Všechny"] + sorted(df["protocol_id"].unique().tolist())
sel_proto = st.selectbox("Protokol", protocols)
roles = ["Všechny"] + sorted(df["contact_role"].dropna().unique().tolist())
sel_role = st.selectbox("Role", roles)
sites = ["Všechny"] + sorted(df["site_id"].dropna().unique().tolist())
sel_site = st.selectbox("Site", sites)
search = st.text_input("Hledat (jméno, email…)")
st.divider()
if st.button("🔄 Obnovit data"):
st.cache_data.clear()
st.rerun()
st.caption(f"Naposledy načteno: {pd.Timestamp.now().strftime('%H:%M:%S')}")
# ── Filtrování ─────────────────────────────────────────────────────────────────
filtered = df.copy()
if sel_proto != "Všechny":
filtered = filtered[filtered["protocol_id"] == sel_proto]
if sel_role != "Všechny":
filtered = filtered[filtered["contact_role"] == sel_role]
if sel_site != "Všechny":
filtered = filtered[filtered["site_id"] == sel_site]
if search:
mask = filtered.apply(
lambda row: row.astype(str).str.contains(search, case=False, na=False).any(),
axis=1,
)
filtered = filtered[mask]
# ── Metriky ────────────────────────────────────────────────────────────────────
col1, col2, col3, col4 = st.columns(4)
col1.metric("Kontaktů celkem", len(filtered))
col2.metric("Protokolů", filtered["protocol_id"].nunique())
col3.metric("Středisek", filtered["site_id"].nunique())
col4.metric("Rolí", filtered["contact_role"].nunique())
st.divider()
# ── Tabulka ────────────────────────────────────────────────────────────────────
display = filtered[["protocol_id", "file_date"] + list(DISPLAY_COLS.keys())].copy()
display = display.rename(columns={"protocol_id": "Protocol", "file_date": "File Date", **DISPLAY_COLS})
st.dataframe(
display,
width="stretch",
hide_index=True,
column_config={
"Email": st.column_config.LinkColumn("Email", display_text=".*"),
"Start Date": st.column_config.DateColumn("Start Date", format="DD-MMM-YYYY"),
"End Date": st.column_config.DateColumn("End Date", format="DD-MMM-YYYY"),
},
)
st.caption(f"Zobrazeno {len(filtered)} z {len(df)} záznamů")
+22
View File
@@ -0,0 +1,22 @@
{
"sel_status": "Všechna",
"sel_proto": "77242113UCO3001",
"sel_role": [
"Principal Investigator",
"Sub-Investigator",
"Study Coordinator"
],
"sel_site": [
"DD5-CZ10001",
"DD5-CZ10003",
"DD5-CZ10006",
"DD5-CZ10009",
"DD5-CZ10010",
"DD5-CZ10012",
"DD5-CZ10013",
"DD5-CZ10015",
"DD5-CZ10016",
"DD5-CZ10020",
"DD5-CZ10021"
]
}
+6
View File
@@ -0,0 +1,6 @@
import subprocess
import sys
from pathlib import Path
app = Path(__file__).parent / "webreport.py"
subprocess.run([sys.executable, "-m", "streamlit", "run", str(app)])
+223
View File
@@ -0,0 +1,223 @@
"""
create_report.py
Streamlit report kontaktů z MySQL tabulky CTMS_contacts.
Spuštění: streamlit run create_report.py
"""
import json
from pathlib import Path
import mysql.connector
import pandas as pd
import pyperclip
import streamlit as st
# ── Konfigurace ────────────────────────────────────────────────────────────────
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "studie",
"charset": "utf8mb4",
}
TABLE = "CTMS_contacts"
STATE_FILE = Path(__file__).parent / "filter_state.json"
ACTIVE_SITES = {
"77242113UCO3001": {
"DD5-CZ10001", "DD5-CZ10003", "DD5-CZ10006", "DD5-CZ10009",
"DD5-CZ10010", "DD5-CZ10012", "DD5-CZ10013", "DD5-CZ10015",
"DD5-CZ10016", "DD5-CZ10020", "DD5-CZ10021", "DD5-CZ10022",
},
"42847922MDD3003": {
"S10-CZ10004", "S10-CZ10008", "S10-CZ10011", "S10-CZ10012",
},
}
DISPLAY_COLS = {
"site_id": "Site ID",
"institution_name": "Institution",
"pi_full_name": "PI",
"contact_title": "Title",
"last_name": "Last Name",
"first_name": "First Name",
"contact_role": "Role",
"primary_indicator": "Primary",
"phone": "Phone",
"phone_mobile": "Mobile",
"email": "Email",
"contact_start_date": "Start Date",
"contact_end_date": "End Date",
}
STATUS_OPTIONS = ["Aktivní", "Neaktivní", "Všechna"]
DEFAULT_STATUS = "Aktivní"
# ── Perzistence filtrů ─────────────────────────────────────────────────────────
def load_filter_state() -> dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except Exception:
pass
return {}
def save_filter_state():
state = {
"sel_status": st.session_state.get("sel_status", DEFAULT_STATUS),
"sel_proto": st.session_state.get("sel_proto", "Všechny"),
"sel_role": st.session_state.get("sel_role", []),
"sel_site": st.session_state.get("sel_site", []),
}
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
# ── Data ───────────────────────────────────────────────────────────────────────
@st.cache_data(ttl=300)
def load_data() -> pd.DataFrame:
cols = ", ".join(DISPLAY_COLS.keys())
sql = (
f"SELECT protocol_id, file_date, {cols} "
f"FROM {TABLE} "
f"ORDER BY protocol_id, site_id, contact_role, last_name, first_name"
)
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows)
# ── Aplikace ───────────────────────────────────────────────────────────────────
st.set_page_config(page_title="CTMS Contacts", page_icon="🏥", layout="wide")
st.title("🏥 CTMS Contacts — Czechia")
try:
df = load_data()
except Exception as e:
st.error(f"Chyba připojení k MySQL: {e}")
st.stop()
protocols = ["Všechny"] + sorted(df["protocol_id"].unique().tolist())
# Načti uložený stav jednou za session
if "filters_initialized" not in st.session_state:
saved = load_filter_state()
st.session_state["sel_status"] = saved.get("sel_status", DEFAULT_STATUS) if saved.get("sel_status") in STATUS_OPTIONS else DEFAULT_STATUS
st.session_state["sel_proto"] = saved.get("sel_proto", "Všechny") if saved.get("sel_proto") in protocols else "Všechny"
st.session_state["sel_role"] = saved.get("sel_role", [])
st.session_state["sel_site"] = saved.get("sel_site", [])
st.session_state["filters_initialized"] = True
# Role a centra podle vybraného protokolu + aktivní/neaktivní
all_active = set().union(*ACTIVE_SITES.values())
df_opts = df.copy()
if st.session_state["sel_proto"] != "Všechny":
df_opts = df_opts[df_opts["protocol_id"] == st.session_state["sel_proto"]]
if st.session_state["sel_status"] == "Aktivní":
df_opts = df_opts[df_opts["site_id"].isin(all_active) & df_opts["contact_end_date"].isna()]
elif st.session_state["sel_status"] == "Neaktivní":
df_opts = df_opts[~df_opts["site_id"].isin(all_active)]
roles = sorted(df_opts["contact_role"].dropna().unique().tolist())
sites = sorted(df_opts["site_id"].dropna().unique().tolist())
# Pročisti neplatné výběry po změně protokolu
st.session_state["sel_role"] = [r for r in st.session_state["sel_role"] if r in roles]
st.session_state["sel_site"] = [s for s in st.session_state["sel_site"] if s in sites]
# ── Sidebar filtry ─────────────────────────────────────────────────────────────
with st.sidebar:
st.header("Filtry")
st.radio(
"Střediska", STATUS_OPTIONS, horizontal=True,
key="sel_status", on_change=save_filter_state,
)
st.selectbox(
"Protokol", protocols,
key="sel_proto", on_change=save_filter_state,
)
st.multiselect(
"Role", roles,
key="sel_role", on_change=save_filter_state,
)
st.multiselect(
"Site", sites,
key="sel_site", on_change=save_filter_state,
)
search = st.text_input("Hledat (jméno, email…)")
st.divider()
if st.button("🔄 Obnovit data"):
st.cache_data.clear()
st.rerun()
st.caption(f"Naposledy načteno: {pd.Timestamp.now().strftime('%H:%M:%S')}")
# ── Filtrování ─────────────────────────────────────────────────────────────────
filtered = df.copy()
if st.session_state["sel_proto"] != "Všechny":
filtered = filtered[filtered["protocol_id"] == st.session_state["sel_proto"]]
if st.session_state["sel_status"] == "Aktivní":
filtered = filtered[filtered["site_id"].isin(all_active) & filtered["contact_end_date"].isna()]
elif st.session_state["sel_status"] == "Neaktivní":
filtered = filtered[~filtered["site_id"].isin(all_active)]
if st.session_state["sel_role"]:
filtered = filtered[filtered["contact_role"].isin(st.session_state["sel_role"])]
if st.session_state["sel_site"]:
filtered = filtered[filtered["site_id"].isin(st.session_state["sel_site"])]
if search:
mask = filtered.apply(
lambda row: row.astype(str).str.contains(search, case=False, na=False).any(),
axis=1,
)
filtered = filtered[mask]
# ── Metriky ────────────────────────────────────────────────────────────────────
col1, col2, col3, col4 = st.columns(4)
col1.metric("Kontaktů celkem", len(filtered))
col2.metric("Protokolů", filtered["protocol_id"].nunique())
col3.metric("Středisek", filtered["site_id"].nunique())
col4.metric("Rolí", filtered["contact_role"].nunique())
st.divider()
# ── Tabulka ────────────────────────────────────────────────────────────────────
display = filtered[["protocol_id", "file_date"] + list(DISPLAY_COLS.keys())].copy()
display = display.rename(columns={"protocol_id": "Protocol", "file_date": "File Date", **DISPLAY_COLS})
st.dataframe(
display,
width="stretch",
hide_index=True,
column_config={
"Email": st.column_config.LinkColumn("Email", display_text=".*"),
"Start Date": st.column_config.DateColumn("Start Date", format="DD-MMM-YYYY"),
"End Date": st.column_config.DateColumn("End Date", format="DD-MMM-YYYY"),
},
)
st.caption(f"Zobrazeno {len(filtered)} z {len(df)} záznamů")
st.divider()
email_rows = filtered[["first_name", "last_name", "email"]].dropna(subset=["email"])
email_rows = email_rows[email_rows["email"].str.strip() != ""]
entries = [
f"{row.first_name} {row.last_name} <{row.email}>"
for row in email_rows.itertuples()
]
email_str = "; ".join(entries)
if st.button(f"📋 Kopírovat emaily do clipboardu ({len(entries)} adres)"):
if entries:
pyperclip.copy(email_str)
st.success(f"✅ Zkopírováno {len(entries)} adres — vlož přímo do pole Komu.")
+562
View File
@@ -0,0 +1,562 @@
import os
import mysql.connector
import pandas as pd
from datetime import date
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
import db_config
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = Path(os.path.dirname(os.path.abspath(__file__)))
OUTPUT_DIR = BASE_DIR / "output"
DATE_COLUMNS = {
"Orig Exp Date", "Exp Date", "Rcv Date",
"Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date",
}
N_SHIP_COLS = 9 # počet shipment sloupců před detail sloupci
# ── DB ────────────────────────────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def get_latest_import_id(cursor, study):
cursor.execute(
"SELECT MAX(import_id) AS mid FROM iwrs_import WHERE study=%s AND report_type='drugs'",
(study,),
)
row = cursor.fetchone()
mid = row["mid"]
if mid is None:
raise RuntimeError(f"Žádná data v MySQL pro studii {study}")
return mid
# ── Načítání dat ──────────────────────────────────────────────────────────────
def load_inventory(cursor, study, import_id):
sql = """
SELECT
i.site AS Site,
i.medication_id AS `Med ID`,
i.packaged_lot_no AS `Lot No.`,
i.original_expiration_date AS `Orig Exp Date`,
i.expiration_date AS `Exp Date`,
i.received_date AS `Rcv Date`,
i.receipt_user AS `Rcpt User`,
i.subject_identifier AS `Subject ID`,
i.quantity_assigned AS `Qty Asgn`,
i.irt_transaction AS `IRT Tx`,
i.date_assigned AS `Date Asgn`,
i.assignment_user AS `Asgn User`,
i.dispensation_status AS `Disp Status`,
i.dispensing_date AS `Disp Date`,
i.quantity_dispensed AS `Qty Disp`,
i.dispensing_user AS `Disp User`,
i.quantity_returned AS `Qty Ret`,
i.date_returned AS `Date Ret`,
i.return_user AS `Ret User`,
d.destruction_date AS Destroyed,
d.basket_id AS `Basket No.`
FROM iwrs_inventory i
LEFT JOIN (
SELECT medication_id,
ANY_VALUE(basket_id) AS basket_id,
ANY_VALUE(destruction_date) AS destruction_date
FROM iwrs_destruction
WHERE study = %s
GROUP BY medication_id
) d ON d.medication_id = i.medication_id
WHERE i.import_id = %s
AND i.study = %s
ORDER BY i.site, i.received_date, i.medication_id
"""
cursor.execute(sql, (study, import_id, study))
rows = cursor.fetchall()
df = pd.DataFrame(rows)
for col in DATE_COLUMNS:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
print(f" Inventory: {len(df)} kitu")
return df
def load_shipments(cursor, study, import_id):
sql = """
SELECT
s.shipment_id AS `Shipment ID`,
s.status AS `IRT Shipment Status`,
s.type AS Type,
s.ship_from AS `Shipment From`,
s.ship_to_site AS `Ship To:`,
s.request_date AS `Request Date`,
s.received_date AS `Received Date`,
s.received_by AS `Received by`,
s.expected_arrival AS `Expected Arrival`,
i.investigator AS Investigator,
i.medication_description AS `Medication Description`,
i.medication_id AS `Medication ID`,
i.packaged_lot_no AS `Packaged Lot number`,
i.expiration_date AS `Expiration Date`,
i.item_status AS Status
FROM iwrs_shipments s
JOIN iwrs_shipment_items i
ON i.study = s.study
AND i.shipment_id = s.shipment_id
AND i.import_id = %s
WHERE s.import_id = %s
AND s.study = %s
ORDER BY s.ship_to_site, s.shipment_id, i.medication_id
"""
cursor.execute(sql, (import_id, import_id, study))
rows = cursor.fetchall()
df = pd.DataFrame(rows)
for col in ("Request Date", "Received Date", "Expiration Date", "Expected Arrival"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
n_ship = df["Shipment ID"].nunique() if len(df) else 0
print(f" Shipments: {n_ship} zásilek, {len(df)} kitu")
return df
# ── Odvozené sheety ───────────────────────────────────────────────────────────
def build_site_summary(shipments_df):
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = shipments_df.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = (
pivot[STATUS_COLS]
.reset_index()
.rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
.sort_values("Site")
.reset_index(drop=True)
)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
print(f" Site Summary: {len(pivot)} center")
return pivot
def build_expired(df):
today = date.today()
mask = (
df["Basket No."].isna() &
df["Subject ID"].isna() &
(df["Exp Date"] < pd.Timestamp(today))
)
filtered = df[mask].copy().reset_index(drop=True)
sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}"
print(f" Expired: {len(filtered)}")
return filtered, sheet_name
def build_assigned_not_dispensed(df):
mask = df["Subject ID"].notna() & df["Disp Date"].isna()
filtered = df[mask].copy().reset_index(drop=True)
print(f" Assigned not dispensed: {len(filtered)}")
return filtered
def build_not_returned(df):
no_ret = df[
df["Date Ret"].isna() &
df["Subject ID"].notna() &
(df["Disp Status"].fillna("").str.upper() != "NOT DISPENSED")
].copy()
max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date")
no_ret = no_ret.join(max_asgn, on="Subject ID")
filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy()
filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."])
filtered = filtered.reset_index(drop=True)
print(f" Not returned: {len(filtered)}")
return filtered
def build_kits_for_destruction(df):
mask = (
df["Basket No."].isna() &
(df["Date Ret"].notna() | (df["Disp Status"].fillna("").str.upper() == "NOT DISPENSED"))
)
filtered = (
df[mask]
.copy()
.sort_values(["Site", "Date Ret"], ascending=[True, True])
.drop(columns=["Destroyed", "Basket No."])
.reset_index(drop=True)
)
print(f" Kits for destruction: {len(filtered)}")
return filtered
# ── Formátování ───────────────────────────────────────────────────────────────
STRIPE_GRAY = PatternFill("solid", start_color="F2F2F2")
STRIPE_WHITE = PatternFill("solid", start_color="FFFFFF")
# pacienti — styly zachovány z create_subject_report.py
_PAT_HEADER_FILL = PatternFill("solid", start_color="1F4E79")
_PAT_HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
_PAT_NORMAL_FONT = Font(name="Arial", size=10)
_PAT_BOLD_FONT = Font(name="Arial", bold=True, size=10)
_PAT_STRIKE_FONT = Font(name="Arial", size=10, strike=True, color="999999")
_PAT_ADOLESC_FONT = Font(name="Arial", bold=True, size=10)
_PAT_THIN = Side(style="thin", color="CCCCCC")
_PAT_BORDER = Border(left=_PAT_THIN, right=_PAT_THIN, top=_PAT_THIN, bottom=_PAT_THIN)
_PAT_EVEN_FILL = PatternFill("solid", start_color="EBF3FB")
_PAT_ODD_FILL = PatternFill("solid", start_color="FFFFFF")
_PAT_CENTER = Alignment(horizontal="center", vertical="center")
_PAT_LEFT = Alignment(horizontal="left", vertical="center")
def _autofit(ws):
for col_cells in ws.columns:
max_len = 0
col_letter = get_column_letter(col_cells[0].column)
for cell in col_cells:
if cell.value is None:
continue
# datum se zobrazí jako DD-MMM-YYYY = 11 znaků
if hasattr(cell.value, "strftime") or cell.number_format == "DD-MMM-YYYY":
length = 11
else:
length = len(str(cell.value))
if length > max_len:
max_len = length
ws.column_dimensions[col_letter].width = min(max_len + 3, 50)
def format_sheet(ws, header_color, highlight_col=None, highlight_color=None):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
header_fill = PatternFill("solid", start_color=header_color)
header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10)
row_font = Font(name="Arial", size=10)
hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None
headers = [cell.value for cell in ws[1]]
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
cell.border = border
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
for cell in row:
col_name = headers[cell.column - 1] if cell.column <= len(headers) else None
cell.font = row_font
cell.border = border
cell.alignment = Alignment(horizontal="center")
if col_name in DATE_COLUMNS:
cell.number_format = "DD-MMM-YYYY"
if hi_fill and col_name == highlight_col:
cell.fill = hi_fill
else:
cell.fill = stripe
_autofit(ws)
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
def format_shipment_sheet(ws, header_color_ship, header_color_detail, n_ship_cols):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
hfont = Font(bold=True, color="FFFFFF", name="Arial", size=10)
dfont = Font(name="Arial", size=10)
fill_ship = PatternFill("solid", start_color=header_color_ship)
fill_detail = PatternFill("solid", start_color=header_color_detail)
for cell in ws[1]:
cell.fill = fill_ship if cell.column <= n_ship_cols else fill_detail
cell.font = hfont
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = border
ws.row_dimensions[1].height = 30
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
for cell in row:
cell.font = dfont
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = stripe
if cell.value.__class__.__name__ in ("datetime", "date", "Timestamp"):
cell.number_format = "DD-MMM-YYYY"
_autofit(ws)
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Pacienti ─────────────────────────────────────────────────────────────────
PATIENT_TABLE = {
"77242113UCO3001": "iwrs_uco3001_subject_summary",
"42847922MDD3003": "iwrs_mdd3003_subject_summary",
}
def load_patients(cursor, study):
table = PATIENT_TABLE[study]
cursor.execute(f"SELECT MAX(import_id) AS mid FROM {table}")
mid = cursor.fetchone()["mid"]
if mid is None:
raise RuntimeError(f"Žádná data v MySQL pro pacienty {study}")
sql = f"""
SELECT
subject AS `Subject`,
investigator AS `Investigator`,
age AS `Subject's age collection`,
cohort_per_irt AS `Cohort per IRT`,
irt_subject_status AS `IRT Subject Status`,
last_irt_transaction AS `Last Recorded IRT Transaction`,
next_irt_transaction AS `Next Expected IRT Transaction`,
next_irt_transaction_date_local AS `Next Expected IRT Transaction Date [Local]`
FROM {table}
WHERE import_id = %s
ORDER BY subject
"""
cursor.execute(sql, (mid,))
rows = cursor.fetchall()
df = pd.DataFrame(rows)
if "Next Expected IRT Transaction Date [Local]" in df.columns:
df["Next Expected IRT Transaction Date [Local]"] = pd.to_datetime(
df["Next Expected IRT Transaction Date [Local]"], errors="coerce"
)
print(f" Pacienti: {len(df)} subjektů (import_id={mid})")
return df
def _simplify_cohort(val):
if pd.isna(val):
return ""
val = str(val)
if "dolescent" in val:
return "Adolescent"
if val.startswith("Adult"):
return "Adult"
return val
def _fmt_date(val):
if pd.isna(val):
return ""
if hasattr(val, "strftime"):
return val.strftime("%Y-%m-%d")
return str(val)[:10]
def _write_prehled(wb, df_raw, study):
ws = wb.create_sheet("Přehled", 0)
ws.sheet_view.showGridLines = False
ws.merge_cells("A1:H1")
title = ws["A1"]
title.value = f"Subject Summary — {study} ({date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
display_headers = ["Subject", "Investigator", "Věk", "Cohort", "Status", "Last IRT", "Next Visit", "Next Date"]
col_widths = [14, 22, 6, 12, 14, 12, 12, 13]
for c, (h, w) in enumerate(zip(display_headers, col_widths), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = _PAT_HEADER_FONT
cell.fill = _PAT_HEADER_FILL
cell.alignment = _PAT_CENTER
cell.border = _PAT_BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
display = pd.DataFrame({
"Subject": df_raw["Subject"].fillna(""),
"Investigator": df_raw["Investigator"].fillna(""),
"Věk": df_raw["Subject's age collection"].apply(lambda v: "" if pd.isna(v) else int(v)),
"Cohort": df_raw["Cohort per IRT"].apply(_simplify_cohort),
"Status": df_raw["IRT Subject Status"].fillna(""),
"Last IRT": df_raw["Last Recorded IRT Transaction"].fillna(""),
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna(""),
"Next Date": df_raw["Next Expected IRT Transaction Date [Local]"].apply(_fmt_date),
}).sort_values("Subject").reset_index(drop=True)
for r_idx, row in display.iterrows():
excel_row = r_idx + 3
status = str(row["Status"])
is_failed = "Screen Failed" in status or "Discontinued" in status
is_randomized = "Randomized" in status
is_adolescent = row["Cohort"] == "Adolescent"
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
values = [row["Subject"], row["Investigator"], row["Věk"],
row["Cohort"], row["Status"], row["Last IRT"],
row["Next Visit"], row["Next Date"]]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = _PAT_BORDER
cell.alignment = _PAT_CENTER if c_idx == 3 else _PAT_LEFT
if is_failed:
cell.font = _PAT_STRIKE_FONT
elif c_idx == 5 and is_randomized:
cell.font = _PAT_BOLD_FONT
elif c_idx == 4 and is_adolescent:
cell.font = _PAT_ADOLESC_FONT
else:
cell.font = _PAT_NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:H{len(display) + 2}"
def _write_next_visits(wb, df_raw, study):
ws = wb.create_sheet("Next Visits", 1)
ws.sheet_view.showGridLines = False
ws.merge_cells("A1:D1")
title = ws["A1"]
title.value = f"Next Expected Visits — {study} ({date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
nv_headers = ["Subject", "Investigator", "Next Visit", "Datum"]
nv_widths = [14, 22, 26, 13]
for c, (h, w) in enumerate(zip(nv_headers, nv_widths), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = _PAT_HEADER_FONT
cell.fill = _PAT_HEADER_FILL
cell.alignment = _PAT_CENTER
cell.border = _PAT_BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
df = pd.DataFrame({
"Subject": df_raw["Subject"].fillna(""),
"Investigator": df_raw["Investigator"].fillna(""),
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna(""),
"Datum": df_raw["Next Expected IRT Transaction Date [Local]"],
"Status": df_raw["IRT Subject Status"].fillna(""),
})
df = df[df["Datum"].notna()]
df = df[~df["Status"].str.contains("Screen Failed|Discontinued", na=False)]
df = df.sort_values("Datum").reset_index(drop=True)
for r_idx, row in df.iterrows():
excel_row = r_idx + 3
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
datum_val = row["Datum"]
datum_str = datum_val.strftime("%Y-%m-%d") if hasattr(datum_val, "strftime") else str(datum_val)[:10]
for c_idx, val in enumerate([row["Subject"], row["Investigator"], row["Next Visit"], datum_str], 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = _PAT_BORDER
cell.font = _PAT_NORMAL_FONT
cell.alignment = _PAT_LEFT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:D{len(df) + 2}"
# ── Jeden report pro jednu studii ─────────────────────────────────────────────
def create_study_report(study):
today = date.today()
# číslování: najdi nejvyšší existující verzi pro dnešní datum
existing = sorted(OUTPUT_DIR.glob(f"{today} {study} CZ IWRS overview v*.xlsx"))
if existing:
last = existing[-1].stem # např. "2026-05-12 42847922MDD3003 CZ IWRS overview v3"
last_ver = int(last.rsplit("v", 1)[-1])
version = last_ver + 1
else:
version = 1
output_file = OUTPUT_DIR / f"{today} {study} CZ IWRS overview v{version}.xlsx"
print(f"\n[{study}] Načítám z MySQL...")
conn = get_conn()
cursor = conn.cursor(dictionary=True)
import_id = get_latest_import_id(cursor, study)
print(f" import_id = {import_id}")
df = load_inventory(cursor, study, import_id)
shipments_df = load_shipments(cursor, study, import_id)
df_patients = load_patients(cursor, study)
cursor.close()
conn.close()
expired_df, expired_sheet = build_expired(df)
assigned_df = build_assigned_not_dispensed(df)
not_returned_df = build_not_returned(df)
destruction_df = build_kits_for_destruction(df)
site_summary_df = build_site_summary(shipments_df)
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview")
expired_df.to_excel( writer, index=False, sheet_name=expired_sheet)
assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed")
not_returned_df.to_excel( writer, index=False, sheet_name="Not returned")
destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction")
shipments_df.to_excel( writer, index=False, sheet_name="Shipments")
site_summary_df.to_excel( writer, index=False, sheet_name="Site Summary")
wb = load_workbook(output_file)
ws_main = wb["CountryMedicationOverview"]
format_sheet(ws_main, header_color="1F4E79")
green_fill = PatternFill("solid", start_color="E2EFDA")
headers_main = [c.value for c in ws_main[1]]
for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row):
for cell in row:
col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None
if col_name in ("Destroyed", "Basket No."):
cell.fill = green_fill
format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0")
format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC")
format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA")
format_sheet(wb["Kits for destruction"], header_color="595959")
format_shipment_sheet(wb["Shipments"], "1F4E79", "375623", N_SHIP_COLS)
format_sheet(wb["Site Summary"], header_color="1F4E79")
# ── pacienti (Přehled + Next Visits) na začátek ──────────────────────────
_write_prehled(wb, df_patients, study)
_write_next_visits(wb, df_patients, study)
wb.save(output_file)
print(f" Uloženo: {output_file.name} ({len(df)} řádků)")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
OUTPUT_DIR.mkdir(exist_ok=True)
for study in STUDIES:
try:
create_study_report(study)
except Exception as e:
import traceback
print(f"\n[{study}] CHYBA: {e}")
traceback.print_exc()
print("\nHotovo.")
main()