Files
janssen/IWRS/create_report_v1.0.py
2026-06-10 11:59:19 +02:00

679 lines
28 KiB
Python

"""
================================================================================
create_report_v1.0.py — IWRS report: pacienti + léky v jednom Excelu
Verze: 1.0
Datum: 2026-06-10
================================================================================
Sjednocuje bývalé Patients/create_subject_report.py a Drugs/create_report.py
(oba v Trash/). Čte výhradně MongoDB (db `studie`, kolekce iwrs_*) — žádné XLSX
zdroje, takže nezávisí na adresářích Patients/ a Drugs/.
Výstup: IWRS/Reports/YYYY-MM-DD {study} IWRS report.xlsx
(kolize ve stejný den → ' HHMM' před příponou)
Listy (v pořadí):
1. Přehled — stav pacientů (škrtnutí SF/DC, bold Randomized,
UCO3001 navíc flag sloupce Rescreened/ADT-IR/…)
2. Next Visits — očekávané visity aktivních subjektů (I-0 = screening+42d)
3. Patient Visits — proběhlé visity s medikací (group by visit)
4. CountryMedicationOverview — sklad per kit + Destroyed/Basket No.
5. Expired as of … — expirované nepřiřazené kity na centrech
6. Assigned not dispensed — přiřazené, nevydané
7. Not returned — nevrácené z minulých visit
8. Kits for destruction — vrácené/nevydané kity čekající na destrukci
9. Shipments — zásilky + položky (dvoubarevná hlavička)
10. Site Summary — počty kitů per centrum a status
Použití:
python create_report_v1.0.py # obě studie
python create_report_v1.0.py --study 42847922MDD3003
"""
import os
import sys
import argparse
import pandas as pd
from datetime import date
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
from common.mongo_writer import get_db
from common.paths import STUDIES, unique_path
REPORTS_DIR = os.path.join(BASE_DIR, "Reports")
DATE_COLUMNS = {
"Orig Exp Date", "Exp Date", "Rcv Date",
"Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date",
"Visit Date", "Scheduled Date",
}
N_SHIP_COLS = 9 # počet shipment sloupců před detail sloupci
# ── Načítání dat z MongoDB ────────────────────────────────────────────────────
INVENTORY_COLS = [
("site", "Site"),
("medication_id", "Med ID"),
("packaged_lot_no", "Lot No."),
("original_expiration_date", "Orig Exp Date"),
("expiration_date", "Exp Date"),
("received_date", "Rcv Date"),
("receipt_user", "Rcpt User"),
("subject_identifier", "Subject ID"),
("quantity_assigned", "Qty Asgn"),
("irt_transaction", "IRT Tx"),
("date_assigned", "Date Asgn"),
("assignment_user", "Asgn User"),
("dispensation_status", "Disp Status"),
("dispensing_date", "Disp Date"),
("quantity_dispensed", "Qty Disp"),
("dispensing_user", "Disp User"),
("quantity_returned", "Qty Ret"),
("date_returned", "Date Ret"),
("return_user", "Ret User"),
]
def load_inventory(study):
db = get_db()
inv = list(db.iwrs_inventory.find({"study": study}))
destr = list(db.iwrs_destruction.find({"study": study}))
# map medication_id -> first basket+date
destr_map = {}
for d in destr:
mid = d.get("medication_id")
if mid and mid not in destr_map:
destr_map[mid] = (d.get("basket_id"), d.get("destruction_date"))
records = []
for doc in inv:
row = {label: doc.get(key) for key, label in INVENTORY_COLS}
b, dt = destr_map.get(doc.get("medication_id"), (None, None))
row["Destroyed"] = dt
row["Basket No."] = b
records.append(row)
df = pd.DataFrame(records)
if df.empty:
print(" Inventory: 0 kitu")
return df
df = df.sort_values(["Site", "Rcv Date", "Med ID"], na_position="last").reset_index(drop=True)
for col in DATE_COLUMNS:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
print(f" Inventory: {len(df)} kitu")
return df
SHIP_COLS = [
("shipment_id", "Shipment ID"),
("status", "IRT Shipment Status"),
("type", "Type"),
("ship_from", "Shipment From"),
("ship_to_site", "Ship To:"),
("request_date", "Request Date"),
("received_date", "Received Date"),
("received_by", "Received by"),
("expected_arrival", "Expected Arrival"),
]
ITEM_COLS = [
("investigator", "Investigator"),
("medication_description", "Medication Description"),
("medication_id", "Medication ID"),
("packaged_lot_no", "Packaged Lot number"),
("expiration_date", "Expiration Date"),
("item_status", "Status"),
]
def load_shipments(study):
db = get_db()
ships = list(db.iwrs_shipments.find({"study": study}))
items = list(db.iwrs_shipment_items.find({"study": study}))
# index items by shipment_id
items_by_ship = {}
for it in items:
items_by_ship.setdefault(it.get("shipment_id"), []).append(it)
records = []
for s in ships:
base = {label: s.get(key) for key, label in SHIP_COLS}
for it in items_by_ship.get(s.get("shipment_id"), []):
row = dict(base)
for key, label in ITEM_COLS:
row[label] = it.get(key)
records.append(row)
df = pd.DataFrame(records)
if df.empty:
print(" Shipments: 0 zásilek, 0 kitu")
return df
df = df.sort_values(["Ship To:", "Shipment ID", "Medication ID"], na_position="last").reset_index(drop=True)
for col in ("Request Date", "Received Date", "Expiration Date", "Expected Arrival"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
n_ship = df["Shipment ID"].nunique()
print(f" Shipments: {n_ship} zásilek, {len(df)} kitu")
return df
def load_visits(study):
db = get_db()
cur = db.iwrs_visits.find({
"study": study,
"visit_type": "Past",
"irt_transaction_no": {"$ne": None},
})
rows = []
for v in cur:
rows.append({
"Subject": v.get("subject"),
"Visit Date": v.get("actual_date") or v.get("scheduled_date"),
"Scheduled Date": v.get("scheduled_date"),
"IRT Tx No": v.get("irt_transaction_no"),
"Visit": v.get("irt_transaction_description"),
"Medication": v.get("medication_assignment"),
"medication_id": v.get("medication_id"),
"quantity_assigned": v.get("quantity_assigned"),
})
df = pd.DataFrame(rows)
if df.empty:
print(" Visits: 0 radku")
return df
# GROUP BY subject/actual/scheduled/irt_no/desc/medication
grouped = (
df.groupby(["Subject", "Visit Date", "Scheduled Date", "IRT Tx No", "Visit", "Medication"],
dropna=False, as_index=False)
.agg(**{
"Med IDs": ("medication_id", lambda s: ", ".join(sorted([str(x) for x in s if pd.notna(x)]))),
"Qty": ("quantity_assigned", "sum"),
})
)
grouped = grouped.sort_values(["Subject", "Visit Date"]).reset_index(drop=True)
for col in ("Visit Date", "Scheduled Date"):
if col in grouped.columns:
grouped[col] = pd.to_datetime(grouped[col], errors="coerce")
if study == "77242113UCO3001":
grouped["Visit"] = grouped["Visit"].replace("Subject Number Creation", "Screening")
print(f" Visits: {len(grouped)} řádků")
return grouped
# ── Odvozené sheety ───────────────────────────────────────────────────────────
def build_site_summary(shipments_df):
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = shipments_df.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = (
pivot[STATUS_COLS]
.reset_index()
.rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
.sort_values("Site")
.reset_index(drop=True)
)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
print(f" Site Summary: {len(pivot)} center")
return pivot
def build_expired(df):
today = date.today()
mask = (
df["Basket No."].isna() &
df["Subject ID"].isna() &
(df["Exp Date"] < pd.Timestamp(today))
)
filtered = df[mask].copy().reset_index(drop=True)
sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}"
print(f" Expired: {len(filtered)}")
return filtered, sheet_name
def build_assigned_not_dispensed(df):
mask = df["Subject ID"].notna() & df["Disp Date"].isna()
filtered = df[mask].copy().reset_index(drop=True)
print(f" Assigned not dispensed: {len(filtered)}")
return filtered
def build_not_returned(df):
no_ret = df[
df["Date Ret"].isna() &
df["Subject ID"].notna() &
(df["Disp Status"].fillna("").str.upper() != "NOT DISPENSED")
].copy()
max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date")
no_ret = no_ret.join(max_asgn, on="Subject ID")
filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy()
filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."])
filtered = filtered.reset_index(drop=True)
print(f" Not returned: {len(filtered)}")
return filtered
def build_kits_for_destruction(df):
mask = (
df["Basket No."].isna() &
(df["Date Ret"].notna() | (df["Disp Status"].fillna("").str.upper() == "NOT DISPENSED"))
)
filtered = (
df[mask]
.copy()
.sort_values(["Site", "Date Ret"], ascending=[True, True])
.drop(columns=["Destroyed", "Basket No."])
.reset_index(drop=True)
)
print(f" Kits for destruction: {len(filtered)}")
return filtered
# ── Formátování ───────────────────────────────────────────────────────────────
STRIPE_GRAY = PatternFill("solid", start_color="F2F2F2")
STRIPE_WHITE = PatternFill("solid", start_color="FFFFFF")
# pacienti — styly zachovány z create_subject_report.py
_PAT_HEADER_FILL = PatternFill("solid", start_color="1F4E79")
_PAT_HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
_PAT_NORMAL_FONT = Font(name="Arial", size=10)
_PAT_BOLD_FONT = Font(name="Arial", bold=True, size=10)
_PAT_STRIKE_FONT = Font(name="Arial", size=10, strike=True, color="999999")
_PAT_ADOLESC_FONT = Font(name="Arial", bold=True, size=10)
_PAT_THIN = Side(style="thin", color="CCCCCC")
_PAT_BORDER = Border(left=_PAT_THIN, right=_PAT_THIN, top=_PAT_THIN, bottom=_PAT_THIN)
_PAT_EVEN_FILL = PatternFill("solid", start_color="EBF3FB")
_PAT_ODD_FILL = PatternFill("solid", start_color="FFFFFF")
_PAT_CENTER = Alignment(horizontal="center", vertical="center")
_PAT_LEFT = Alignment(horizontal="left", vertical="center")
def _autofit(ws):
for col_cells in ws.columns:
max_len = 0
col_letter = get_column_letter(col_cells[0].column)
for cell in col_cells:
if cell.value is None:
continue
# datum se zobrazí jako DD-MMM-YYYY = 11 znaků
if hasattr(cell.value, "strftime") or cell.number_format == "DD-MMM-YYYY":
length = 11
else:
length = len(str(cell.value))
if length > max_len:
max_len = length
ws.column_dimensions[col_letter].width = min(max_len + 3, 50)
def format_sheet(ws, header_color, highlight_col=None, highlight_color=None):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
header_fill = PatternFill("solid", start_color=header_color)
header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10)
row_font = Font(name="Arial", size=10)
hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None
headers = [cell.value for cell in ws[1]]
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
cell.border = border
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
for cell in row:
col_name = headers[cell.column - 1] if cell.column <= len(headers) else None
cell.font = row_font
cell.border = border
cell.alignment = Alignment(horizontal="center")
if col_name in DATE_COLUMNS:
cell.number_format = "DD-MMM-YYYY"
if hi_fill and col_name == highlight_col:
cell.fill = hi_fill
else:
cell.fill = stripe
_autofit(ws)
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
def format_shipment_sheet(ws, header_color_ship, header_color_detail, n_ship_cols):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
hfont = Font(bold=True, color="FFFFFF", name="Arial", size=10)
dfont = Font(name="Arial", size=10)
fill_ship = PatternFill("solid", start_color=header_color_ship)
fill_detail = PatternFill("solid", start_color=header_color_detail)
for cell in ws[1]:
cell.fill = fill_ship if cell.column <= n_ship_cols else fill_detail
cell.font = hfont
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = border
ws.row_dimensions[1].height = 30
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
for cell in row:
cell.font = dfont
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = stripe
if cell.value.__class__.__name__ in ("datetime", "date", "Timestamp"):
cell.number_format = "DD-MMM-YYYY"
_autofit(ws)
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Pacienti ─────────────────────────────────────────────────────────────────
def load_patients(study):
db = get_db()
docs = list(db.iwrs_subject_summary.find({"study": study}))
if not docs:
raise RuntimeError(f"Žádná data v Mongo pro pacienty {study}")
base_cols = [
("subject", "Subject"),
("investigator", "Investigator"),
("age", "Subject's age collection"),
("cohort_per_irt", "Cohort per IRT"),
("irt_subject_status", "IRT Subject Status"),
("last_irt_transaction", "Last Recorded IRT Transaction"),
("next_irt_transaction", "Next Expected IRT Transaction"),
("next_irt_transaction_date_local", "Next Expected IRT Transaction Date [Local]"),
]
uco_extra = [
("rescreened_subject", "Rescreened Subject"),
("adt_ir", "ADT-IR"),
("three_or_more_advanced_therapies", "3+ Adv. Therapies"),
("only_oral_5asa_compounds", "Only 5-ASA"),
("ustekinumab", "Ustekinumab"),
("isolated_proctitis", "Isolated Proctitis"),
]
cols = list(base_cols)
if study == "77242113UCO3001":
cols += uco_extra
rows = [{label: d.get(key) for key, label in cols} for d in docs]
df = pd.DataFrame(rows).sort_values("Subject").reset_index(drop=True)
if "Next Expected IRT Transaction Date [Local]" in df.columns:
df["Next Expected IRT Transaction Date [Local]"] = pd.to_datetime(
df["Next Expected IRT Transaction Date [Local]"], errors="coerce"
)
print(f" Pacienti: {len(df)} subjektů")
return df
def _simplify_cohort(val):
if pd.isna(val):
return ""
val = str(val)
if "dolescent" in val:
return "Adolescent"
if val.startswith("Adult"):
return "Adult"
return val
def _fmt_date(val):
if pd.isna(val):
return ""
if hasattr(val, "strftime"):
return val.strftime("%Y-%m-%d")
return str(val)[:10]
def _write_prehled(wb, df_raw, study):
ws = wb.create_sheet("Přehled", 0)
ws.sheet_view.showGridLines = False
is_uco = (study == "77242113UCO3001")
if is_uco:
display_headers = ["Subject", "Investigator", "Věk", "Cohort",
"Rescreened", "ADT-IR", "≥3 Adv.Th.", "5-ASA only",
"Uste.", "Isol.Proct.",
"Status", "Last IRT", "Next Visit", "Next Date"]
col_widths = [14, 22, 6, 12, 11, 8, 11, 10, 8, 12, 14, 12, 12, 13]
status_col = 11
flag_cols = set(range(5, 11)) # 1-indexed sloupce s Yes/No hodnotami
else:
display_headers = ["Subject", "Investigator", "Věk", "Cohort", "Status", "Last IRT", "Next Visit", "Next Date"]
col_widths = [14, 22, 6, 12, 14, 12, 12, 13]
status_col = 5
flag_cols = set()
last_col = get_column_letter(len(display_headers))
ws.merge_cells(f"A1:{last_col}1")
title = ws["A1"]
title.value = f"Subject Summary — {study} ({date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
for c, (h, w) in enumerate(zip(display_headers, col_widths), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = _PAT_HEADER_FONT
cell.fill = _PAT_HEADER_FILL
cell.alignment = _PAT_CENTER
cell.border = _PAT_BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
base = {
"Subject": df_raw["Subject"].fillna(""),
"Investigator": df_raw["Investigator"].fillna(""),
"Věk": df_raw["Subject's age collection"].apply(lambda v: "" if pd.isna(v) else int(v)),
"Cohort": df_raw["Cohort per IRT"].apply(_simplify_cohort),
}
if is_uco:
base.update({
"Rescreened": df_raw["Rescreened Subject"].fillna(""),
"ADT-IR": df_raw["ADT-IR"].fillna(""),
"≥3 Adv.Th.": df_raw["3+ Adv. Therapies"].fillna(""),
"5-ASA only": df_raw["Only 5-ASA"].fillna(""),
"Uste.": df_raw["Ustekinumab"].fillna(""),
"Isol.Proct.": df_raw["Isolated Proctitis"].fillna(""),
})
base.update({
"Status": df_raw["IRT Subject Status"].fillna(""),
"Last IRT": df_raw["Last Recorded IRT Transaction"].fillna(""),
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna(""),
"Next Date": df_raw["Next Expected IRT Transaction Date [Local]"].apply(_fmt_date),
})
display = pd.DataFrame(base).sort_values("Subject").reset_index(drop=True)
for r_idx, row in display.iterrows():
excel_row = r_idx + 3
status = str(row["Status"])
is_failed = "Screen Failed" in status or "Discontinued" in status
is_randomized = "Randomized" in status
is_adolescent = row["Cohort"] == "Adolescent"
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
for c_idx, val in enumerate(row, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = _PAT_BORDER
cell.alignment = _PAT_CENTER if (c_idx == 3 or c_idx in flag_cols) else _PAT_LEFT
if is_failed:
cell.font = _PAT_STRIKE_FONT
elif c_idx == status_col and is_randomized:
cell.font = _PAT_BOLD_FONT
elif c_idx == 4 and is_adolescent:
cell.font = _PAT_ADOLESC_FONT
else:
cell.font = _PAT_NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:{last_col}{len(display) + 2}"
def _write_next_visits(wb, df_raw, study, visits_df=None):
ws = wb.create_sheet("Next Visits", 1)
ws.sheet_view.showGridLines = False
ws.merge_cells("A1:D1")
title = ws["A1"]
title.value = f"Next Expected Visits — {study} ({date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
nv_headers = ["Subject", "Investigator", "Next Visit", "Datum"]
nv_widths = [14, 22, 26, 13]
for c, (h, w) in enumerate(zip(nv_headers, nv_widths), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = _PAT_HEADER_FONT
cell.fill = _PAT_HEADER_FILL
cell.alignment = _PAT_CENTER
cell.border = _PAT_BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
df = pd.DataFrame({
"Subject": df_raw["Subject"].fillna(""),
"Investigator": df_raw["Investigator"].fillna(""),
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna(""),
"Datum": df_raw["Next Expected IRT Transaction Date [Local]"],
"Status": df_raw["IRT Subject Status"].fillna(""),
})
# I-0: datum = screening date + 42 dní
if visits_df is not None and not visits_df.empty:
screen = (
visits_df[visits_df["Visit"].str.contains("Screen", case=False, na=False)]
.groupby("Subject")["Visit Date"].min()
.rename("Screening Date")
)
df = df.join(screen, on="Subject")
mask_i0 = df["Next Visit"].str.contains("I-0", na=False)
df.loc[mask_i0, "Datum"] = df.loc[mask_i0, "Screening Date"] + pd.Timedelta(days=42)
df = df.drop(columns=["Screening Date"])
df = df[df["Datum"].notna()]
df = df[~df["Status"].str.contains("Screen Failed|Discontinued", na=False)]
df = df.sort_values("Datum").reset_index(drop=True)
for r_idx, row in df.iterrows():
excel_row = r_idx + 3
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
datum_val = row["Datum"]
datum_str = datum_val.strftime("%Y-%m-%d") if hasattr(datum_val, "strftime") else str(datum_val)[:10]
for c_idx, val in enumerate([row["Subject"], row["Investigator"], row["Next Visit"], datum_str], 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = _PAT_BORDER
cell.font = _PAT_NORMAL_FONT
cell.alignment = _PAT_LEFT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:D{len(df) + 2}"
# ── Jeden report pro jednu studii ─────────────────────────────────────────────
def create_study_report(study):
today = date.today()
output_file = unique_path(REPORTS_DIR, f"{today} {study} IWRS report")
print(f"\n[{study}] Nacitam z MongoDB...")
df = load_inventory(study)
shipments_df = load_shipments(study)
df_patients = load_patients(study)
visits_df = load_visits(study)
expired_df, expired_sheet = build_expired(df)
assigned_df = build_assigned_not_dispensed(df)
not_returned_df = build_not_returned(df)
destruction_df = build_kits_for_destruction(df)
site_summary_df = build_site_summary(shipments_df)
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview")
expired_df.to_excel( writer, index=False, sheet_name=expired_sheet)
assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed")
not_returned_df.to_excel( writer, index=False, sheet_name="Not returned")
destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction")
shipments_df.to_excel( writer, index=False, sheet_name="Shipments")
site_summary_df.to_excel( writer, index=False, sheet_name="Site Summary")
visits_df.to_excel( writer, index=False, sheet_name="Patient Visits")
wb = load_workbook(output_file)
ws_main = wb["CountryMedicationOverview"]
format_sheet(ws_main, header_color="1F4E79")
green_fill = PatternFill("solid", start_color="E2EFDA")
headers_main = [c.value for c in ws_main[1]]
for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row):
for cell in row:
col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None
if col_name in ("Destroyed", "Basket No."):
cell.fill = green_fill
format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0")
format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC")
format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA")
format_sheet(wb["Kits for destruction"], header_color="595959")
format_shipment_sheet(wb["Shipments"], "1F4E79", "375623", N_SHIP_COLS)
format_sheet(wb["Site Summary"], header_color="1F4E79")
format_sheet(wb["Patient Visits"], header_color="1F4E79")
# ── pacienti (Přehled + Next Visits) na začátek ──────────────────────────
_write_prehled(wb, df_patients, study)
_write_next_visits(wb, df_patients, study, visits_df)
# ── pořadí listů: Přehled → Next Visits → Patient Visits → léky ──────────
first = ["Přehled", "Next Visits", "Patient Visits"]
rest = [s for s in wb.sheetnames if s not in first]
wb._sheets = [wb[s] for s in first] + [wb[s] for s in rest]
wb.save(output_file)
print(f" Uloženo: {os.path.basename(output_file)} ({len(df)} řádků skladu)")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description="IWRS report: pacienti + léky v jednom Excelu (z MongoDB)")
ap.add_argument("--study", choices=STUDIES, help="jen jedna studie")
args = ap.parse_args()
os.makedirs(REPORTS_DIR, exist_ok=True)
for study in ([args.study] if args.study else STUDIES):
try:
create_study_report(study)
except Exception as e:
import traceback
print(f"\n[{study}] CHYBA: {e}")
traceback.print_exc()
print("\nHotovo.")
if __name__ == "__main__":
main()