Files
janssen/Covance/Trash/create_report_v2.0.py
2026-06-16 14:32:28 +02:00

531 lines
24 KiB
Python

# create_report_v2.0.py — v2.0 — 2026-05-29
# UCO3001 Covance specimen & kit report — zdroj dat: MongoDB (covance + edc)
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Border, Side, Alignment
from openpyxl.utils import get_column_letter
from datetime import date, datetime
from pymongo import MongoClient
# ── Konfigurace ────────────────────────────────────────────────────────────────
MONGO_URI = "mongodb://192.168.1.76:27017"
out_dir = "U:/Dropbox/!!!Days/Downloads Z230/"
# ── MongoDB připojení ──────────────────────────────────────────────────────────
client = MongoClient(MONGO_URI)
covance_db = client["covance"]
edc_db = client["edc"]
# ── Načtení dat z MongoDB ──────────────────────────────────────────────────────
print("Načítám data z MongoDB...")
samples_docs = list(covance_db["allsamples"].find())
df = pd.DataFrame([doc["fields"] for doc in samples_docs]).reset_index(drop=True)
print(f" allsamples: {len(df)} záznamů")
kit_docs = list(covance_db["kits"].find())
kit_df_raw = pd.DataFrame([doc["fields"] for doc in kit_docs]).reset_index(drop=True)
print(f" kits: {len(kit_df_raw)} záznamů")
edc_docs = list(edc_db["UCO3001.DateofVisit"].find())
edc_rows = []
for doc in edc_docs:
edc_rows.append({
"SiteNumber": doc["site"]["number"],
"Subject": doc["subject"]["label"],
"InstanceName": doc["form"]["instanceName"],
"Field4Value": doc["fields"].get("Visit Start Date"),
"Field5Value": doc["fields"].get("Type of Contact"),
})
edc_df_raw = pd.DataFrame(edc_rows)
print(f" DateofVisit: {len(edc_df_raw)} záznamů")
# ── Výstupní soubor ────────────────────────────────────────────────────────────
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
out_filename = f"{timestamp} 77242113UCO3001 CZE Labcorp samples and kit inventory report.xlsx"
out_path = out_dir + out_filename
# ── Příprava dat — allsamples ──────────────────────────────────────────────────
all_patients = sorted(df['Patient No.'].dropna().unique())
bxscr = df[df['Protocol Visit Code'] == 'BXSCR']
dna = df[df['Protocol Visit Code'] == 'DNA']
def fmt_date(val):
if val is None:
return None
if isinstance(val, float) and pd.isna(val):
return None
if isinstance(val, datetime):
return val.replace(tzinfo=None)
if isinstance(val, str):
for fmt in ('%d-%b-%Y', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d'):
try:
return datetime.strptime(val.strip(), fmt)
except ValueError:
pass
try:
return pd.to_datetime(val).to_pydatetime().replace(tzinfo=None)
except Exception:
return None
OK_STATUSES = {'Received', 'In Inventory', 'Shipped'}
def get_specimen_info(visit_df, patient, specimen_type=None):
rows = visit_df[visit_df['Patient No.'] == patient]
if specimen_type:
rows = rows[rows['Specimen Type'] == specimen_type]
rows = rows[rows['Sample Status'].isin(OK_STATUSES)]
if rows.empty:
return '', None
row = rows.iloc[0]
return fmt_date(row['Container Receipt Date']), rows.index[0] + 2
def get_label_info(patient, label_code, visit_code):
rows = df[(df['Patient No.'] == patient) &
(df['Protocol Visit Code'] == visit_code) &
(df['Container Label Line 1'] == label_code)]
rows = rows[rows['Sample Status'].isin(OK_STATUSES)]
if rows.empty:
return '', None
row = rows.iloc[0]
return fmt_date(row['Container Receipt Date']), rows.index[0] + 2
# ── Příprava dat — kit inventory ───────────────────────────────────────────────
cze = kit_df_raw[kit_df_raw["Country"] == "CZE"].copy()
def parse_kit_date(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, datetime):
return val.replace(tzinfo=None)
try:
return datetime.strptime(str(val).strip(), "%b %d, %Y")
except ValueError:
return None
cze["Shipped Date"] = cze["Shipped Date"].apply(parse_kit_date)
cze["Expiration Date"] = cze["Expiration Date"].apply(parse_kit_date)
cze["Days to Expiration"] = pd.to_numeric(cze["Days to Expiration"], errors="coerce")
cze = cze.sort_values(["Site", "Kit Type", "Expiration Date"]).reset_index(drop=True)
today_dt = datetime.combine(date.today(), datetime.min.time())
def bucket(exp_date):
if exp_date is None:
return None
return "soon" if (exp_date - today_dt).days <= 30 else "ok"
cze["_bucket"] = cze["Expiration Date"].apply(bucket)
kit_order = sorted(cze["Kit Type"].unique(), key=lambda x: (str(x).lstrip("T-").zfill(5), str(x)))
kit_desc = cze.drop_duplicates("Kit Type").set_index("Kit Type")["Description"].to_dict()
kit_sites = sorted(cze["Site"].unique())
# ── Příprava dat — EDC pacienti ────────────────────────────────────────────────
def fmt_date_edc(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, datetime):
return val.replace(tzinfo=None)
if isinstance(val, str):
for fmt in ('%d %b %Y', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d'):
try:
return datetime.strptime(val.strip(), fmt)
except ValueError:
pass
try:
return pd.to_datetime(val).to_pydatetime().replace(tzinfo=None)
except Exception:
return None
_pat_pre = edc_df_raw[['SiteNumber', 'Subject', 'Field4Value']].copy()
_pat_pre['Field4Value'] = _pat_pre['Field4Value'].apply(fmt_date_edc)
_pat_pre = _pat_pre.sort_values(['SiteNumber', 'Subject', 'Field4Value']).reset_index(drop=True)
patient_row_map = {}
for i, row in _pat_pre.iterrows():
pat = row['Subject']
if pat not in patient_row_map:
patient_row_map[pat] = i + 2
bxscr_patients = sorted(bxscr['Patient No.'].dropna().unique())
# ── Workbook ───────────────────────────────────────────────────────────────────
out_wb = Workbook()
out_wb.remove(out_wb.active)
# ── Styly ──────────────────────────────────────────────────────────────────────
thin = Side(style='thin')
border = Border(left=thin, right=thin, top=thin, bottom=thin)
header_fill = PatternFill("solid", fgColor="4472C4")
header_font = Font(name='Calibri', bold=True, size=11, color="FFFFFF")
data_font = Font(name='Calibri', size=11)
date_font_link = Font(name='Calibri', size=11, color="000000", underline='single')
yes_fill = PatternFill("solid", fgColor="E2EFDA")
no_fill = PatternFill("solid", fgColor="FFE7E7")
sum_header_font = Font(name='Calibri', bold=True, size=11, color="000000")
sum_total_font = Font(name='Calibri', bold=True, size=11)
zero_font = Font(name='Calibri', size=11, color="BFBFBF")
zero_red_font = Font(name='Calibri', size=11, color="C00000")
dark_blue_fill = PatternFill("solid", fgColor="203764")
orange_fill = PatternFill("solid", fgColor="FFF2CC")
green_fill = PatternFill("solid", fgColor="E2EFDA")
total_fill = PatternFill("solid", fgColor="D9E1F2")
exp_fill = PatternFill("solid", fgColor="FFE7E7")
ok_fill = PatternFill("solid", fgColor="E2EFDA")
# ── List: Zdroj ────────────────────────────────────────────────────────────────
# Generován z covance.allsamples — pořadí řádků odpovídá df.index,
# proto hyperlinky z Přehledu vzorků (index + 2) míří na správné řádky.
src_ws = out_wb.create_sheet("Zdroj")
src_sheet_name = "Zdroj"
pat_sheet_name = "Seznam pacientů"
zdroj_columns = [
"Protocol Code", "Investigator No.", "Investigator Name", "Patient No.",
"Collection Date", "Protocol Visit Code", "Kit Receipt Date",
"Container Receipt Date", "Accession", "Container No.", "Container Barcode No.",
"Specimen Type", "Sample Status", "Expected Receipt Condition",
"Actual Receipt Condition", "Container Label Line 1", "Container Label Line 2",
"SM Sample Status", "SMART Specimen Class Description", "Parent Barcode", "Children Barcode",
]
for col_idx, col_name in enumerate(zdroj_columns, 1):
cell = src_ws.cell(row=1, column=col_idx, value=col_name)
cell.font = header_font
cell.fill = header_fill
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
src_ws.column_dimensions[get_column_letter(col_idx)].width = max(len(col_name) + 2, 14)
src_ws.row_dimensions[1].height = 30
src_ws.freeze_panes = "A2"
def clean(v):
try:
if pd.isna(v):
return None
except (TypeError, ValueError):
pass
return v
for row_idx, (_, row) in enumerate(df.iterrows(), 2):
for col_idx, col_name in enumerate(zdroj_columns, 1):
val = clean(row.get(col_name))
cell = src_ws.cell(row=row_idx, column=col_idx, value=val)
cell.font = data_font
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center')
src_ws.auto_filter.ref = f"A1:{get_column_letter(len(zdroj_columns))}1"
# ── List: Přehled vzorků ───────────────────────────────────────────────────────
analysis_ws = out_wb.create_sheet("Přehled vzorků")
columns = [
("Investigator Name", 24),
("Číslo pacienta", 20),
("Máme biopsii SM11", 20),
("Máme RNA", 16),
("Máme Cryostor", 16),
("DNA", 14),
("PLASMPK I-0 TROUGH", 18),
("PLASMA PK I-0 PEAK", 18),
("SERUM ADA I-0 PRE", 18),
("SM06/SERUM BIOM", 16),
("SM07/WB RNA", 14),
("SM10/FECAL", 14),
("PLASMPK I-2 TROUGH", 18),
("PLASMA PK I-2 PEAK", 18),
("SERUM ADA I-2 PRE", 18),
("STOOL I-2", 12),
("PLASMPK I-4 TROUGH", 18),
("PLASMA PK I-4 PEAK", 18),
("SERUM ADA I-4 PRE", 18),
("SM06/SERUM BIOM", 16),
("SM07/WB RNA", 14),
("STOOL I-4", 12),
]
group_font = Font(name='Calibri', bold=True, size=11)
group_fill = PatternFill("solid", fgColor="FFFFFF")
group_border = Border(left=thin, right=thin, top=thin, bottom=thin)
groups = [
(3, 5, "SCREENING"),
(7, 12, "RANDOMIZACE I-0"),
(13, 16, "I-2"),
(17, 22, "I-4"),
]
for start_col, end_col, label in groups:
analysis_ws.merge_cells(start_row=1, start_column=start_col, end_row=1, end_column=end_col)
cell = analysis_ws.cell(row=1, column=start_col, value=label)
cell.font = group_font
cell.fill = group_fill
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = group_border
for c in range(start_col, end_col + 1):
analysis_ws.cell(row=1, column=c).border = group_border
analysis_ws.row_dimensions[1].height = 20
for col_idx, (hdr, width) in enumerate(columns, 1):
cell = analysis_ws.cell(row=2, column=col_idx, value=hdr)
cell.font = header_font
cell.fill = header_fill
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
analysis_ws.column_dimensions[get_column_letter(col_idx)].width = width
analysis_ws.row_dimensions[2].height = 30
analysis_ws.freeze_panes = "C3"
for row_idx, patient in enumerate(bxscr_patients, 3):
investigator = bxscr[bxscr['Patient No.'] == patient].iloc[0]['Investigator Name']
sm11, sm11_row = get_specimen_info(bxscr, patient, 'Tissue , Paraffin Block')
rna, rna_row = get_specimen_info(bxscr, patient, 'Biopsy RNA Later')
cryo, cryo_row = get_specimen_info(bxscr, patient, 'Biopsy, Frozen Tissue')
dna_date, dna_row = get_specimen_info(dna, patient)
trough, trough_row = get_label_info(patient, 'PLASMPK I-0 TROUGH', 'I-0')
peak, peak_row = get_label_info(patient, 'PLASMA PK I-0 PEAK', 'I-0')
ada, ada_row = get_label_info(patient, 'SERUM ADA I-0 PRE', 'I-0')
sm06, sm06_row = get_label_info(patient, 'SM06/SERUM BIOM', 'I-0')
sm07, sm07_row = get_label_info(patient, 'SM07/WB RNA', 'I-0')
sm10, sm10_row = get_label_info(patient, 'SM10/FECAL', 'I-0')
trough2, trough2_row = get_label_info(patient, 'PLASMPK I-2 TROUGH', 'I-2')
peak2, peak2_row = get_label_info(patient, 'PLASMA PK I-2 PEAK', 'I-2')
ada2, ada2_row = get_label_info(patient, 'SERUM ADA I-2 PRE', 'I-2')
stool2, stool2_row = get_label_info(patient, 'STOOL I-2', 'I-2')
trough4, trough4_row = get_label_info(patient, 'PLASMPK I-4 TROUGH', 'I-4')
peak4, peak4_row = get_label_info(patient, 'PLASMA PK I-4 PEAK', 'I-4')
ada4, ada4_row = get_label_info(patient, 'SERUM ADA I-4 PRE', 'I-4')
sm064, sm064_row = get_label_info(patient, 'SM06/SERUM BIOM', 'I-4')
sm074, sm074_row = get_label_info(patient, 'SM07/WB RNA', 'I-4')
stool4, stool4_row = get_label_info(patient, 'STOOL I-4', 'I-4')
row_data = [
investigator, patient,
(sm11, sm11_row), (rna, rna_row), (cryo, cryo_row), (dna_date, dna_row),
(trough, trough_row), (peak, peak_row), (ada, ada_row),
(sm06, sm06_row), (sm07, sm07_row), (sm10, sm10_row),
(trough2, trough2_row),(peak2, peak2_row), (ada2, ada2_row), (stool2, stool2_row),
(trough4, trough4_row),(peak4, peak4_row), (ada4, ada4_row),
(sm064, sm064_row), (sm074, sm074_row), (stool4, stool4_row),
]
for col_idx, value in enumerate(row_data, 1):
if col_idx <= 2:
cell = analysis_ws.cell(row=row_idx, column=col_idx, value=value)
if col_idx == 2 and patient in patient_row_map:
cell.hyperlink = f"#'{pat_sheet_name}'!B{patient_row_map[patient]}"
cell.font = Font(name='Calibri', size=11, underline='single')
else:
cell.font = data_font
else:
dt, excel_row = value
cell = analysis_ws.cell(row=row_idx, column=col_idx, value=dt)
if dt and excel_row is not None:
cell.hyperlink = f"#'{src_sheet_name}'!A{excel_row}"
cell.font = date_font_link
cell.fill = yes_fill
cell.number_format = 'DD-MMM-YYYY'
else:
cell.font = Font(name='Calibri', size=11, color="C00000")
cell.fill = no_fill
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center')
# ── List: Seznam pacientů ──────────────────────────────────────────────────────
patients_ws = out_wb.create_sheet("Seznam pacientů")
pat_columns = [
("Číslo centra", 20),
("Číslo pacienta", 20),
("Kód návštěvy", 20),
("Datum návštěvy", 16),
("Typ návštěvy", 16),
]
for col_idx, (col_name, width) in enumerate(pat_columns, 1):
cell = patients_ws.cell(row=1, column=col_idx, value=col_name)
cell.font = header_font
cell.fill = header_fill
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
patients_ws.column_dimensions[get_column_letter(col_idx)].width = width
patients_ws.row_dimensions[1].height = 30
patients_ws.freeze_panes = "A2"
pat_df = edc_df_raw[['SiteNumber', 'Subject', 'InstanceName', 'Field4Value', 'Field5Value']].copy()
pat_df['Field4Value'] = pat_df['Field4Value'].apply(fmt_date_edc)
pat_df = pat_df.sort_values(['SiteNumber', 'Subject', 'Field4Value']).reset_index(drop=True)
pat_col_keys = ['SiteNumber', 'Subject', 'InstanceName', 'Field4Value', 'Field5Value']
for row_idx, (_, row) in enumerate(pat_df.iterrows(), 2):
for col_idx, key in enumerate(pat_col_keys, 1):
value = clean(row[key])
cell = patients_ws.cell(row=row_idx, column=col_idx, value=value)
cell.font = data_font
cell.border = border
cell.alignment = Alignment(horizontal='center', vertical='center')
if col_idx == 4 and value is not None:
cell.number_format = 'DD-MMM-YYYY'
# ── Pomocná funkce pro souhrnné tabulky ────────────────────────────────────────
def write_summary_table(ws, current_row, title, rows_data, col_a_header):
for c in range(1, 5):
cell = ws.cell(row=current_row, column=c)
cell.fill = dark_blue_fill
cell.border = border
ws.cell(row=current_row, column=1, value=title).font = Font(name='Calibri', bold=True, size=12, color="FFFFFF")
ws.cell(row=current_row, column=1).alignment = Alignment(horizontal="left", vertical="center")
ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=4)
ws.row_dimensions[current_row].height = 22
current_row += 1
for col_idx, (h, f) in enumerate(zip(
[col_a_header, "Description", "Expiruje do 30 dní", "Expiruje později"],
[header_fill, header_fill, orange_fill, green_fill]
), 1):
cell = ws.cell(row=current_row, column=col_idx, value=h)
cell.font = sum_header_font
cell.fill = f
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
ws.row_dimensions[current_row].height = 28
current_row += 1
totals = [0, 0]
for col_a, col_b, n_soon, n_ok in rows_data:
totals[0] += n_soon
totals[1] += n_ok
all_zero = (n_soon == 0 and n_ok == 0)
row_vals = [col_a, col_b, n_soon, n_ok]
row_fills = [None, None,
orange_fill if n_soon > 0 else None,
green_fill if n_ok > 0 else None]
for col_idx, (val, rfill) in enumerate(zip(row_vals, row_fills), 1):
cell = ws.cell(row=current_row, column=col_idx, value=val)
if col_idx >= 3 and val == 0:
cell.font = zero_red_font if all_zero else zero_font
else:
cell.font = data_font
cell.border = border
cell.alignment = Alignment(horizontal="center" if col_idx >= 2 else "left", vertical="center")
if rfill:
cell.fill = rfill
current_row += 1
for col_idx, val in enumerate(["CELKEM", "", totals[0], totals[1]], 1):
cell = ws.cell(row=current_row, column=col_idx, value=val)
cell.font = sum_total_font
cell.fill = total_fill
cell.border = border
cell.alignment = Alignment(horizontal="center" if col_idx >= 2 else "left", vertical="center")
current_row += 2
return current_row
# ── List: Kit Inventory CZE ────────────────────────────────────────────────────
kit_ws = out_wb.create_sheet("Kit Inventory CZE")
listing_columns = [
("Project No.", 14),
("Region", 10),
("Country", 10),
("Site", 38),
("Kit Type", 12),
("Description", 22),
("Accession", 18),
("Shipped Date", 16),
("Expiration Date", 16),
("Days to Expiration", 20),
]
for col_idx, (hdr, width) in enumerate(listing_columns, 1):
cell = kit_ws.cell(row=1, column=col_idx, value=hdr)
cell.font = header_font
cell.fill = header_fill
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
kit_ws.column_dimensions[get_column_letter(col_idx)].width = width
kit_ws.row_dimensions[1].height = 30
kit_ws.freeze_panes = "A2"
for row_idx, (_, row) in enumerate(cze.iterrows(), 2):
days = row.get("Days to Expiration")
for col_idx, (col_name, _) in enumerate(listing_columns, 1):
value = clean(row.get(col_name))
cell = kit_ws.cell(row=row_idx, column=col_idx, value=value)
cell.font = data_font
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center")
if col_name in ("Shipped Date", "Expiration Date") and value is not None:
cell.number_format = "DD-MMM-YYYY"
if col_name == "Days to Expiration":
cell.fill = exp_fill if (pd.notna(days) and days <= 60) else ok_fill
kit_ws.auto_filter.ref = f"A1:{get_column_letter(len(listing_columns))}1"
# ── List: Přehled po centrech ──────────────────────────────────────────────────
ctr_ws = out_wb.create_sheet("Přehled po centrech")
ctr_ws.column_dimensions["A"].width = 22
ctr_ws.column_dimensions["B"].width = 24
ctr_ws.column_dimensions["C"].width = 22
ctr_ws.column_dimensions["D"].width = 20
current_row = 1
for site in kit_sites:
site_df = cze[cze["Site"] == site]
rows_data = []
for kit in kit_order:
desc = kit_desc.get(kit, "")
kit_site_df = site_df[site_df["Kit Type"] == kit]
n_soon = int((kit_site_df["_bucket"] == "soon").sum())
n_ok = int((kit_site_df["_bucket"] == "ok").sum())
rows_data.append((f"{kit}{desc}", desc, n_soon, n_ok))
current_row = write_summary_table(ctr_ws, current_row, site, rows_data, "Kit Type")
# ── List: Přehled po typech kitů ───────────────────────────────────────────────
sum_ws = out_wb.create_sheet("Přehled po typech")
sum_ws.column_dimensions["A"].width = 38
sum_ws.column_dimensions["B"].width = 22
sum_ws.column_dimensions["C"].width = 22
sum_ws.column_dimensions["D"].width = 20
current_row = 1
for kit in kit_order:
desc = kit_desc.get(kit, "")
kit_df = cze[cze["Kit Type"] == kit]
rows_data = []
for site in sorted(kit_df["Site"].unique()):
site_df = kit_df[kit_df["Site"] == site]
n_soon = int((site_df["_bucket"] == "soon").sum())
n_ok = int((site_df["_bucket"] == "ok").sum())
rows_data.append((site, desc, n_soon, n_ok))
current_row = write_summary_table(sum_ws, current_row, f"Kit Type {kit}{desc}", rows_data, "Centrum")
# ── List: eQueries ─────────────────────────────────────────────────────────────
# TODO: doplnit až budou eQuery data importována do MongoDB
# Zdroj: covance db, kolekce "equeries" (dle konvence importu)
# Filtr: Country == "CZECH REPUBLIC"
# Sloupce: Site, Subject, Visit, Visit Collection Date, Accession,
# eQueryId, Issue Type, Status, Create Date, Response Date Time,
# Time Before Response, User Name
# Řazení: Open → Response Received → Closed, pak Site
eq_ws = out_wb.create_sheet("eQueries")
eq_ws.cell(row=1, column=1,
value="TODO: eQuery data zatím nejsou v MongoDB — doplnit po importu.").font = Font(
name='Calibri', bold=True, size=12, color="C00000"
)
eq_ws.column_dimensions["A"].width = 70
# ── Uložení ────────────────────────────────────────────────────────────────────
out_wb.save(out_path)
client.close()
print(f"\nUloženo: {out_path}")
print(f"Pacienti s BXSCR: {len(bxscr_patients)}, Všichni pacienti: {len(all_patients)}")
print(f"CZE kity: {len(cze)}, Typy kitů: {len(kit_order)}, Centra: {len(kit_sites)}")