640 lines
27 KiB
Python
640 lines
27 KiB
Python
import os
|
|
import mysql.connector
|
|
import pandas as pd
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from openpyxl import load_workbook
|
|
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
import db_config
|
|
|
|
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
|
|
|
|
BASE_DIR = Path(os.path.dirname(os.path.abspath(__file__)))
|
|
OUTPUT_DIR = BASE_DIR / "output"
|
|
|
|
DATE_COLUMNS = {
|
|
"Orig Exp Date", "Exp Date", "Rcv Date",
|
|
"Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date",
|
|
"Visit Date", "Scheduled Date",
|
|
}
|
|
|
|
N_SHIP_COLS = 9 # počet shipment sloupců před detail sloupci
|
|
|
|
|
|
# ── DB ────────────────────────────────────────────────────────────────────────
|
|
|
|
def get_conn():
|
|
return mysql.connector.connect(
|
|
host=db_config.DB_HOST, port=db_config.DB_PORT,
|
|
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
|
|
database=db_config.DB_NAME,
|
|
)
|
|
|
|
|
|
def get_latest_import_id(cursor, study):
|
|
cursor.execute(
|
|
"SELECT MAX(import_id) AS mid FROM iwrs_import WHERE study=%s AND report_type='drugs'",
|
|
(study,),
|
|
)
|
|
row = cursor.fetchone()
|
|
mid = row["mid"]
|
|
if mid is None:
|
|
raise RuntimeError(f"Žádná data v MySQL pro studii {study}")
|
|
return mid
|
|
|
|
|
|
# ── Načítání dat ──────────────────────────────────────────────────────────────
|
|
|
|
def load_inventory(cursor, study, import_id):
|
|
sql = """
|
|
SELECT
|
|
i.site AS Site,
|
|
i.medication_id AS `Med ID`,
|
|
i.packaged_lot_no AS `Lot No.`,
|
|
i.original_expiration_date AS `Orig Exp Date`,
|
|
i.expiration_date AS `Exp Date`,
|
|
i.received_date AS `Rcv Date`,
|
|
i.receipt_user AS `Rcpt User`,
|
|
i.subject_identifier AS `Subject ID`,
|
|
i.quantity_assigned AS `Qty Asgn`,
|
|
i.irt_transaction AS `IRT Tx`,
|
|
i.date_assigned AS `Date Asgn`,
|
|
i.assignment_user AS `Asgn User`,
|
|
i.dispensation_status AS `Disp Status`,
|
|
i.dispensing_date AS `Disp Date`,
|
|
i.quantity_dispensed AS `Qty Disp`,
|
|
i.dispensing_user AS `Disp User`,
|
|
i.quantity_returned AS `Qty Ret`,
|
|
i.date_returned AS `Date Ret`,
|
|
i.return_user AS `Ret User`,
|
|
d.destruction_date AS Destroyed,
|
|
d.basket_id AS `Basket No.`
|
|
FROM iwrs_inventory i
|
|
LEFT JOIN (
|
|
SELECT medication_id,
|
|
ANY_VALUE(basket_id) AS basket_id,
|
|
ANY_VALUE(destruction_date) AS destruction_date
|
|
FROM iwrs_destruction
|
|
WHERE study = %s
|
|
GROUP BY medication_id
|
|
) d ON d.medication_id = i.medication_id
|
|
WHERE i.import_id = %s
|
|
AND i.study = %s
|
|
ORDER BY i.site, i.received_date, i.medication_id
|
|
"""
|
|
cursor.execute(sql, (study, import_id, study))
|
|
rows = cursor.fetchall()
|
|
df = pd.DataFrame(rows)
|
|
for col in DATE_COLUMNS:
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors="coerce")
|
|
print(f" Inventory: {len(df)} kitu")
|
|
return df
|
|
|
|
|
|
def load_shipments(cursor, study, import_id):
|
|
sql = """
|
|
SELECT
|
|
s.shipment_id AS `Shipment ID`,
|
|
s.status AS `IRT Shipment Status`,
|
|
s.type AS Type,
|
|
s.ship_from AS `Shipment From`,
|
|
s.ship_to_site AS `Ship To:`,
|
|
s.request_date AS `Request Date`,
|
|
s.received_date AS `Received Date`,
|
|
s.received_by AS `Received by`,
|
|
s.expected_arrival AS `Expected Arrival`,
|
|
i.investigator AS Investigator,
|
|
i.medication_description AS `Medication Description`,
|
|
i.medication_id AS `Medication ID`,
|
|
i.packaged_lot_no AS `Packaged Lot number`,
|
|
i.expiration_date AS `Expiration Date`,
|
|
i.item_status AS Status
|
|
FROM iwrs_shipments s
|
|
JOIN iwrs_shipment_items i
|
|
ON i.study = s.study
|
|
AND i.shipment_id = s.shipment_id
|
|
AND i.import_id = %s
|
|
WHERE s.import_id = %s
|
|
AND s.study = %s
|
|
ORDER BY s.ship_to_site, s.shipment_id, i.medication_id
|
|
"""
|
|
cursor.execute(sql, (import_id, import_id, study))
|
|
rows = cursor.fetchall()
|
|
df = pd.DataFrame(rows)
|
|
for col in ("Request Date", "Received Date", "Expiration Date", "Expected Arrival"):
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors="coerce")
|
|
n_ship = df["Shipment ID"].nunique() if len(df) else 0
|
|
print(f" Shipments: {n_ship} zásilek, {len(df)} kitu")
|
|
return df
|
|
|
|
|
|
def load_visits(cursor, study, import_id):
|
|
cursor.execute(
|
|
"SELECT MAX(import_id) AS mid FROM iwrs_import WHERE study=%s AND report_type='patients'",
|
|
(study,),
|
|
)
|
|
patients_import_id = cursor.fetchone()["mid"] or import_id
|
|
import_id = patients_import_id
|
|
sql = """
|
|
SELECT
|
|
v.subject AS Subject,
|
|
COALESCE(v.actual_date, v.scheduled_date) AS `Visit Date`,
|
|
v.scheduled_date AS `Scheduled Date`,
|
|
v.irt_transaction_no AS `IRT Tx No`,
|
|
v.irt_transaction_description AS `Visit`,
|
|
v.medication_assignment AS `Medication`,
|
|
GROUP_CONCAT(v.medication_id ORDER BY v.medication_id SEPARATOR ', ') AS `Med IDs`,
|
|
SUM(v.quantity_assigned) AS `Qty`
|
|
FROM iwrs_subject_visits v
|
|
WHERE v.import_id = %s AND v.study = %s AND v.visit_type = 'Past'
|
|
AND v.irt_transaction_no IS NOT NULL
|
|
GROUP BY v.subject, v.actual_date, v.scheduled_date,
|
|
v.irt_transaction_no, v.irt_transaction_description, v.medication_assignment
|
|
ORDER BY v.subject, COALESCE(v.actual_date, v.scheduled_date)
|
|
"""
|
|
cursor.execute(sql, (import_id, study))
|
|
rows = cursor.fetchall()
|
|
df = pd.DataFrame(rows)
|
|
for col in ("Visit Date", "Scheduled Date"):
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors="coerce")
|
|
if study == "77242113UCO3001" and "Visit" in df.columns:
|
|
df["Visit"] = df["Visit"].replace("Subject Number Creation", "Screening")
|
|
print(f" Visits: {len(df)} řádků")
|
|
return df
|
|
|
|
|
|
# ── Odvozené sheety ───────────────────────────────────────────────────────────
|
|
|
|
def build_site_summary(shipments_df):
|
|
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
|
|
pivot = shipments_df.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
|
|
for s in STATUS_COLS:
|
|
if s not in pivot.columns:
|
|
pivot[s] = 0
|
|
pivot = (
|
|
pivot[STATUS_COLS]
|
|
.reset_index()
|
|
.rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
|
|
.sort_values("Site")
|
|
.reset_index(drop=True)
|
|
)
|
|
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
|
|
print(f" Site Summary: {len(pivot)} center")
|
|
return pivot
|
|
|
|
|
|
def build_expired(df):
|
|
today = date.today()
|
|
mask = (
|
|
df["Basket No."].isna() &
|
|
df["Subject ID"].isna() &
|
|
(df["Exp Date"] < pd.Timestamp(today))
|
|
)
|
|
filtered = df[mask].copy().reset_index(drop=True)
|
|
sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}"
|
|
print(f" Expired: {len(filtered)}")
|
|
return filtered, sheet_name
|
|
|
|
|
|
def build_assigned_not_dispensed(df):
|
|
mask = df["Subject ID"].notna() & df["Disp Date"].isna()
|
|
filtered = df[mask].copy().reset_index(drop=True)
|
|
print(f" Assigned not dispensed: {len(filtered)}")
|
|
return filtered
|
|
|
|
|
|
def build_not_returned(df):
|
|
no_ret = df[
|
|
df["Date Ret"].isna() &
|
|
df["Subject ID"].notna() &
|
|
(df["Disp Status"].fillna("").str.upper() != "NOT DISPENSED")
|
|
].copy()
|
|
max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date")
|
|
no_ret = no_ret.join(max_asgn, on="Subject ID")
|
|
filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy()
|
|
filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."])
|
|
filtered = filtered.reset_index(drop=True)
|
|
print(f" Not returned: {len(filtered)}")
|
|
return filtered
|
|
|
|
|
|
def build_kits_for_destruction(df):
|
|
mask = (
|
|
df["Basket No."].isna() &
|
|
(df["Date Ret"].notna() | (df["Disp Status"].fillna("").str.upper() == "NOT DISPENSED"))
|
|
)
|
|
filtered = (
|
|
df[mask]
|
|
.copy()
|
|
.sort_values(["Site", "Date Ret"], ascending=[True, True])
|
|
.drop(columns=["Destroyed", "Basket No."])
|
|
.reset_index(drop=True)
|
|
)
|
|
print(f" Kits for destruction: {len(filtered)}")
|
|
return filtered
|
|
|
|
|
|
# ── Formátování ───────────────────────────────────────────────────────────────
|
|
|
|
STRIPE_GRAY = PatternFill("solid", start_color="F2F2F2")
|
|
STRIPE_WHITE = PatternFill("solid", start_color="FFFFFF")
|
|
|
|
# pacienti — styly zachovány z create_subject_report.py
|
|
_PAT_HEADER_FILL = PatternFill("solid", start_color="1F4E79")
|
|
_PAT_HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
|
|
_PAT_NORMAL_FONT = Font(name="Arial", size=10)
|
|
_PAT_BOLD_FONT = Font(name="Arial", bold=True, size=10)
|
|
_PAT_STRIKE_FONT = Font(name="Arial", size=10, strike=True, color="999999")
|
|
_PAT_ADOLESC_FONT = Font(name="Arial", bold=True, size=10)
|
|
_PAT_THIN = Side(style="thin", color="CCCCCC")
|
|
_PAT_BORDER = Border(left=_PAT_THIN, right=_PAT_THIN, top=_PAT_THIN, bottom=_PAT_THIN)
|
|
_PAT_EVEN_FILL = PatternFill("solid", start_color="EBF3FB")
|
|
_PAT_ODD_FILL = PatternFill("solid", start_color="FFFFFF")
|
|
_PAT_CENTER = Alignment(horizontal="center", vertical="center")
|
|
_PAT_LEFT = Alignment(horizontal="left", vertical="center")
|
|
|
|
|
|
def _autofit(ws):
|
|
for col_cells in ws.columns:
|
|
max_len = 0
|
|
col_letter = get_column_letter(col_cells[0].column)
|
|
for cell in col_cells:
|
|
if cell.value is None:
|
|
continue
|
|
# datum se zobrazí jako DD-MMM-YYYY = 11 znaků
|
|
if hasattr(cell.value, "strftime") or cell.number_format == "DD-MMM-YYYY":
|
|
length = 11
|
|
else:
|
|
length = len(str(cell.value))
|
|
if length > max_len:
|
|
max_len = length
|
|
ws.column_dimensions[col_letter].width = min(max_len + 3, 50)
|
|
|
|
|
|
def format_sheet(ws, header_color, highlight_col=None, highlight_color=None):
|
|
thin = Side(style="thin", color="000000")
|
|
border = Border(left=thin, right=thin, top=thin, bottom=thin)
|
|
header_fill = PatternFill("solid", start_color=header_color)
|
|
header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10)
|
|
row_font = Font(name="Arial", size=10)
|
|
hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None
|
|
|
|
headers = [cell.value for cell in ws[1]]
|
|
|
|
for cell in ws[1]:
|
|
cell.fill = header_fill
|
|
cell.font = header_font
|
|
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
|
|
cell.border = border
|
|
|
|
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
|
|
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
|
|
for cell in row:
|
|
col_name = headers[cell.column - 1] if cell.column <= len(headers) else None
|
|
cell.font = row_font
|
|
cell.border = border
|
|
cell.alignment = Alignment(horizontal="center")
|
|
if col_name in DATE_COLUMNS:
|
|
cell.number_format = "DD-MMM-YYYY"
|
|
if hi_fill and col_name == highlight_col:
|
|
cell.fill = hi_fill
|
|
else:
|
|
cell.fill = stripe
|
|
|
|
_autofit(ws)
|
|
ws.auto_filter.ref = ws.dimensions
|
|
ws.freeze_panes = "A2"
|
|
|
|
|
|
def format_shipment_sheet(ws, header_color_ship, header_color_detail, n_ship_cols):
|
|
thin = Side(style="thin", color="000000")
|
|
border = Border(left=thin, right=thin, top=thin, bottom=thin)
|
|
hfont = Font(bold=True, color="FFFFFF", name="Arial", size=10)
|
|
dfont = Font(name="Arial", size=10)
|
|
fill_ship = PatternFill("solid", start_color=header_color_ship)
|
|
fill_detail = PatternFill("solid", start_color=header_color_detail)
|
|
|
|
for cell in ws[1]:
|
|
cell.fill = fill_ship if cell.column <= n_ship_cols else fill_detail
|
|
cell.font = hfont
|
|
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
|
|
cell.border = border
|
|
ws.row_dimensions[1].height = 30
|
|
|
|
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
|
|
stripe = STRIPE_GRAY if row[0].row % 2 == 0 else STRIPE_WHITE
|
|
for cell in row:
|
|
cell.font = dfont
|
|
cell.border = border
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
cell.fill = stripe
|
|
if cell.value.__class__.__name__ in ("datetime", "date", "Timestamp"):
|
|
cell.number_format = "DD-MMM-YYYY"
|
|
|
|
_autofit(ws)
|
|
ws.auto_filter.ref = ws.dimensions
|
|
ws.freeze_panes = "A2"
|
|
|
|
|
|
# ── Pacienti ─────────────────────────────────────────────────────────────────
|
|
|
|
PATIENT_TABLE = {
|
|
"77242113UCO3001": "iwrs_uco3001_subject_summary",
|
|
"42847922MDD3003": "iwrs_mdd3003_subject_summary",
|
|
}
|
|
|
|
|
|
def load_patients(cursor, study):
|
|
table = PATIENT_TABLE[study]
|
|
cursor.execute(f"SELECT MAX(import_id) AS mid FROM {table}")
|
|
mid = cursor.fetchone()["mid"]
|
|
if mid is None:
|
|
raise RuntimeError(f"Žádná data v MySQL pro pacienty {study}")
|
|
extra_cols = ""
|
|
if study == "77242113UCO3001":
|
|
extra_cols = """
|
|
rescreened_subject AS `Rescreened Subject`,
|
|
adt_ir AS `ADT-IR`,
|
|
three_or_more_advanced_therapies AS `3+ Adv. Therapies`,
|
|
only_oral_5asa_compounds AS `Only 5-ASA`,
|
|
ustekinumab AS `Ustekinumab`,
|
|
isolated_proctitis AS `Isolated Proctitis`,"""
|
|
sql = f"""
|
|
SELECT
|
|
subject AS `Subject`,
|
|
investigator AS `Investigator`,
|
|
age AS `Subject's age collection`,
|
|
cohort_per_irt AS `Cohort per IRT`,{extra_cols}
|
|
irt_subject_status AS `IRT Subject Status`,
|
|
last_irt_transaction AS `Last Recorded IRT Transaction`,
|
|
next_irt_transaction AS `Next Expected IRT Transaction`,
|
|
next_irt_transaction_date_local AS `Next Expected IRT Transaction Date [Local]`
|
|
FROM {table}
|
|
WHERE import_id = %s
|
|
ORDER BY subject
|
|
"""
|
|
cursor.execute(sql, (mid,))
|
|
rows = cursor.fetchall()
|
|
df = pd.DataFrame(rows)
|
|
if "Next Expected IRT Transaction Date [Local]" in df.columns:
|
|
df["Next Expected IRT Transaction Date [Local]"] = pd.to_datetime(
|
|
df["Next Expected IRT Transaction Date [Local]"], errors="coerce"
|
|
)
|
|
print(f" Pacienti: {len(df)} subjektů (import_id={mid})")
|
|
return df
|
|
|
|
|
|
def _simplify_cohort(val):
|
|
if pd.isna(val):
|
|
return ""
|
|
val = str(val)
|
|
if "dolescent" in val:
|
|
return "Adolescent"
|
|
if val.startswith("Adult"):
|
|
return "Adult"
|
|
return val
|
|
|
|
|
|
def _fmt_date(val):
|
|
if pd.isna(val):
|
|
return ""
|
|
if hasattr(val, "strftime"):
|
|
return val.strftime("%Y-%m-%d")
|
|
return str(val)[:10]
|
|
|
|
|
|
def _write_prehled(wb, df_raw, study):
|
|
ws = wb.create_sheet("Přehled", 0)
|
|
ws.sheet_view.showGridLines = False
|
|
|
|
is_uco = (study == "77242113UCO3001")
|
|
|
|
if is_uco:
|
|
display_headers = ["Subject", "Investigator", "Věk", "Cohort",
|
|
"Rescreened", "ADT-IR", "≥3 Adv.Th.", "5-ASA only",
|
|
"Uste.", "Isol.Proct.",
|
|
"Status", "Last IRT", "Next Visit", "Next Date"]
|
|
col_widths = [14, 22, 6, 12, 11, 8, 11, 10, 8, 12, 14, 12, 12, 13]
|
|
status_col = 11
|
|
flag_cols = set(range(5, 11)) # 1-indexed sloupce s Yes/No hodnotami
|
|
else:
|
|
display_headers = ["Subject", "Investigator", "Věk", "Cohort", "Status", "Last IRT", "Next Visit", "Next Date"]
|
|
col_widths = [14, 22, 6, 12, 14, 12, 12, 13]
|
|
status_col = 5
|
|
flag_cols = set()
|
|
|
|
last_col = get_column_letter(len(display_headers))
|
|
ws.merge_cells(f"A1:{last_col}1")
|
|
title = ws["A1"]
|
|
title.value = f"Subject Summary — {study} ({date.today().strftime('%d-%b-%Y')})"
|
|
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
|
|
title.alignment = Alignment(horizontal="left", vertical="center")
|
|
ws.row_dimensions[1].height = 22
|
|
|
|
for c, (h, w) in enumerate(zip(display_headers, col_widths), 1):
|
|
cell = ws.cell(row=2, column=c, value=h)
|
|
cell.font = _PAT_HEADER_FONT
|
|
cell.fill = _PAT_HEADER_FILL
|
|
cell.alignment = _PAT_CENTER
|
|
cell.border = _PAT_BORDER
|
|
ws.column_dimensions[get_column_letter(c)].width = w
|
|
ws.row_dimensions[2].height = 18
|
|
|
|
base = {
|
|
"Subject": df_raw["Subject"].fillna(""),
|
|
"Investigator": df_raw["Investigator"].fillna(""),
|
|
"Věk": df_raw["Subject's age collection"].apply(lambda v: "" if pd.isna(v) else int(v)),
|
|
"Cohort": df_raw["Cohort per IRT"].apply(_simplify_cohort),
|
|
}
|
|
if is_uco:
|
|
base.update({
|
|
"Rescreened": df_raw["Rescreened Subject"].fillna(""),
|
|
"ADT-IR": df_raw["ADT-IR"].fillna(""),
|
|
"≥3 Adv.Th.": df_raw["3+ Adv. Therapies"].fillna(""),
|
|
"5-ASA only": df_raw["Only 5-ASA"].fillna(""),
|
|
"Uste.": df_raw["Ustekinumab"].fillna(""),
|
|
"Isol.Proct.": df_raw["Isolated Proctitis"].fillna(""),
|
|
})
|
|
base.update({
|
|
"Status": df_raw["IRT Subject Status"].fillna(""),
|
|
"Last IRT": df_raw["Last Recorded IRT Transaction"].fillna("—"),
|
|
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna("—"),
|
|
"Next Date": df_raw["Next Expected IRT Transaction Date [Local]"].apply(_fmt_date),
|
|
})
|
|
display = pd.DataFrame(base).sort_values("Subject").reset_index(drop=True)
|
|
|
|
for r_idx, row in display.iterrows():
|
|
excel_row = r_idx + 3
|
|
status = str(row["Status"])
|
|
is_failed = "Screen Failed" in status or "Discontinued" in status
|
|
is_randomized = "Randomized" in status
|
|
is_adolescent = row["Cohort"] == "Adolescent"
|
|
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
|
|
|
|
for c_idx, val in enumerate(row, 1):
|
|
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
|
|
cell.fill = fill
|
|
cell.border = _PAT_BORDER
|
|
cell.alignment = _PAT_CENTER if (c_idx == 3 or c_idx in flag_cols) else _PAT_LEFT
|
|
if is_failed:
|
|
cell.font = _PAT_STRIKE_FONT
|
|
elif c_idx == status_col and is_randomized:
|
|
cell.font = _PAT_BOLD_FONT
|
|
elif c_idx == 4 and is_adolescent:
|
|
cell.font = _PAT_ADOLESC_FONT
|
|
else:
|
|
cell.font = _PAT_NORMAL_FONT
|
|
ws.row_dimensions[excel_row].height = 16
|
|
|
|
ws.freeze_panes = "A3"
|
|
ws.auto_filter.ref = f"A2:{last_col}{len(display) + 2}"
|
|
|
|
|
|
def _write_next_visits(wb, df_raw, study):
|
|
ws = wb.create_sheet("Next Visits", 1)
|
|
ws.sheet_view.showGridLines = False
|
|
|
|
ws.merge_cells("A1:D1")
|
|
title = ws["A1"]
|
|
title.value = f"Next Expected Visits — {study} ({date.today().strftime('%d-%b-%Y')})"
|
|
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
|
|
title.alignment = Alignment(horizontal="left", vertical="center")
|
|
ws.row_dimensions[1].height = 22
|
|
|
|
nv_headers = ["Subject", "Investigator", "Next Visit", "Datum"]
|
|
nv_widths = [14, 22, 26, 13]
|
|
for c, (h, w) in enumerate(zip(nv_headers, nv_widths), 1):
|
|
cell = ws.cell(row=2, column=c, value=h)
|
|
cell.font = _PAT_HEADER_FONT
|
|
cell.fill = _PAT_HEADER_FILL
|
|
cell.alignment = _PAT_CENTER
|
|
cell.border = _PAT_BORDER
|
|
ws.column_dimensions[get_column_letter(c)].width = w
|
|
ws.row_dimensions[2].height = 18
|
|
|
|
df = pd.DataFrame({
|
|
"Subject": df_raw["Subject"].fillna(""),
|
|
"Investigator": df_raw["Investigator"].fillna(""),
|
|
"Next Visit": df_raw["Next Expected IRT Transaction"].fillna(""),
|
|
"Datum": df_raw["Next Expected IRT Transaction Date [Local]"],
|
|
"Status": df_raw["IRT Subject Status"].fillna(""),
|
|
})
|
|
df = df[df["Datum"].notna()]
|
|
df = df[~df["Status"].str.contains("Screen Failed|Discontinued", na=False)]
|
|
df = df.sort_values("Datum").reset_index(drop=True)
|
|
|
|
for r_idx, row in df.iterrows():
|
|
excel_row = r_idx + 3
|
|
fill = _PAT_EVEN_FILL if r_idx % 2 == 0 else _PAT_ODD_FILL
|
|
datum_val = row["Datum"]
|
|
datum_str = datum_val.strftime("%Y-%m-%d") if hasattr(datum_val, "strftime") else str(datum_val)[:10]
|
|
for c_idx, val in enumerate([row["Subject"], row["Investigator"], row["Next Visit"], datum_str], 1):
|
|
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
|
|
cell.fill = fill
|
|
cell.border = _PAT_BORDER
|
|
cell.font = _PAT_NORMAL_FONT
|
|
cell.alignment = _PAT_LEFT
|
|
ws.row_dimensions[excel_row].height = 16
|
|
|
|
ws.freeze_panes = "A3"
|
|
ws.auto_filter.ref = f"A2:D{len(df) + 2}"
|
|
|
|
|
|
# ── Jeden report pro jednu studii ─────────────────────────────────────────────
|
|
|
|
def create_study_report(study):
|
|
today = date.today()
|
|
|
|
# číslování: najdi nejvyšší existující verzi pro dnešní datum
|
|
existing = sorted(OUTPUT_DIR.glob(f"{today} {study} CZ IWRS overview v*.xlsx"))
|
|
if existing:
|
|
last = existing[-1].stem # např. "2026-05-12 42847922MDD3003 CZ IWRS overview v3"
|
|
last_ver = int(last.rsplit("v", 1)[-1])
|
|
version = last_ver + 1
|
|
else:
|
|
version = 1
|
|
|
|
output_file = OUTPUT_DIR / f"{today} {study} CZ IWRS overview v{version}.xlsx"
|
|
|
|
print(f"\n[{study}] Načítám z MySQL...")
|
|
conn = get_conn()
|
|
cursor = conn.cursor(dictionary=True)
|
|
import_id = get_latest_import_id(cursor, study)
|
|
print(f" import_id = {import_id}")
|
|
|
|
df = load_inventory(cursor, study, import_id)
|
|
shipments_df = load_shipments(cursor, study, import_id)
|
|
df_patients = load_patients(cursor, study)
|
|
visits_df = load_visits(cursor, study, import_id)
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
expired_df, expired_sheet = build_expired(df)
|
|
assigned_df = build_assigned_not_dispensed(df)
|
|
not_returned_df = build_not_returned(df)
|
|
destruction_df = build_kits_for_destruction(df)
|
|
site_summary_df = build_site_summary(shipments_df)
|
|
|
|
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
|
|
df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview")
|
|
expired_df.to_excel( writer, index=False, sheet_name=expired_sheet)
|
|
assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed")
|
|
not_returned_df.to_excel( writer, index=False, sheet_name="Not returned")
|
|
destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction")
|
|
shipments_df.to_excel( writer, index=False, sheet_name="Shipments")
|
|
site_summary_df.to_excel( writer, index=False, sheet_name="Site Summary")
|
|
visits_df.to_excel( writer, index=False, sheet_name="Patient Visits")
|
|
|
|
wb = load_workbook(output_file)
|
|
|
|
ws_main = wb["CountryMedicationOverview"]
|
|
format_sheet(ws_main, header_color="1F4E79")
|
|
green_fill = PatternFill("solid", start_color="E2EFDA")
|
|
headers_main = [c.value for c in ws_main[1]]
|
|
for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row):
|
|
for cell in row:
|
|
col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None
|
|
if col_name in ("Destroyed", "Basket No."):
|
|
cell.fill = green_fill
|
|
|
|
format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0")
|
|
format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC")
|
|
format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA")
|
|
format_sheet(wb["Kits for destruction"], header_color="595959")
|
|
format_shipment_sheet(wb["Shipments"], "1F4E79", "375623", N_SHIP_COLS)
|
|
format_sheet(wb["Site Summary"], header_color="1F4E79")
|
|
format_sheet(wb["Patient Visits"], header_color="1F4E79")
|
|
|
|
# ── pacienti (Přehled + Next Visits) na začátek ──────────────────────────
|
|
_write_prehled(wb, df_patients, study)
|
|
_write_next_visits(wb, df_patients, study)
|
|
|
|
# ── pořadí listů: Patient Visits jako první ──────────────────────────────
|
|
names = wb.sheetnames
|
|
wb._sheets = [wb["Patient Visits"]] + [wb[s] for s in names if s != "Patient Visits"]
|
|
|
|
wb.save(output_file)
|
|
print(f" Uloženo: {output_file.name} ({len(df)} řádků)")
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
for study in STUDIES:
|
|
try:
|
|
create_study_report(study)
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"\n[{study}] CHYBA: {e}")
|
|
traceback.print_exc()
|
|
print("\nHotovo.")
|
|
|
|
|
|
main()
|