import glob import os import shutil import pandas as pd from openpyxl import load_workbook from openpyxl.styles import Font, PatternFill, Border, Side, Alignment from openpyxl.utils import get_column_letter from datetime import date, datetime # Paths src_dir = os.path.dirname(os.path.abspath(__file__)) + "/" out_dir = "U:/Dropbox/!!!Days/Downloads Z230/" # Find source files src_files = glob.glob(src_dir + "Protocol 77242113UCO3001 - All Samples*.xlsx") assert src_files, "Source file not found!" src_file = src_files[0] print(f"Source xlsx: {src_file}") csv_files = glob.glob(src_dir + "_EDCStdRpt-DataListing.csv") assert csv_files, "CSV file not found!" csv_file = csv_files[0] print(f"Source csv: {csv_file}") kit_csv_files = glob.glob(src_dir + "sponsor-study-36940-kit-inventory-on-hand-expiration.csv") assert kit_csv_files, "Kit inventory CSV not found!" kit_csv_file = kit_csv_files[0] print(f"Kit csv: {kit_csv_file}") eq_csv_files = glob.glob(src_dir + "sponsor-study-36940-activity-reports-documents-equery.csv") assert eq_csv_files, "eQuery CSV not found!" eq_csv_file = eq_csv_files[0] print(f"eQuery csv: {eq_csv_file}") timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") out_filename = f"{timestamp} 77242113UCO3001 CZE Labcorp samples and kit inventory report.xlsx" out_path = out_dir + out_filename # Copy source file to output — preserves all formatting perfectly shutil.copy2(src_file, out_path) # Load data with pandas for analysis df = pd.read_excel(src_file, sheet_name=0, header=0) # All unique patients all_patients = sorted(df['Patient No.'].dropna().unique()) # BXSCR and DNA rows bxscr = df[df['Protocol Visit Code'] == 'BXSCR'] dna = df[df['Protocol Visit Code'] == 'DNA'] # Parse date value to datetime object def fmt_date(val): if pd.isna(val): return None if isinstance(val, str): return datetime.strptime(val, '%d-%b-%Y') return pd.to_datetime(val).to_pydatetime() OK_STATUSES = {'Received', 'In Inventory', 'Shipped'} def get_specimen_info(visit_df, patient, specimen_type=None): rows = visit_df[visit_df['Patient No.'] == patient] if specimen_type: rows = rows[rows['Specimen Type'] == specimen_type] rows = rows[rows['Sample Status'].isin(OK_STATUSES)] if rows.empty: return '', None row = rows.iloc[0] return fmt_date(row['Container Receipt Date']), rows.index[0] + 2 def get_label_info(patient, label_code, visit_code): rows = df[(df['Patient No.'] == patient) & (df['Protocol Visit Code'] == visit_code) & (df['Container Label Line 1'] == label_code)] rows = rows[rows['Sample Status'].isin(OK_STATUSES)] if rows.empty: return '', None row = rows.iloc[0] return fmt_date(row['Container Receipt Date']), rows.index[0] + 2 # Open copied workbook and add analysis sheet out_wb = load_workbook(out_path) # Rename and autofit first sheet src_ws = out_wb.active src_ws.title = "Zdroj" for col in src_ws.columns: max_len = max((len(str(cell.value)) if cell.value is not None else 0) for cell in col) src_ws.column_dimensions[get_column_letter(col[0].column)].width = min(max_len + 2, 50) # ── Styly ──────────────────────────────────────────────────────────────────── thin = Side(style='thin') border = Border(left=thin, right=thin, top=thin, bottom=thin) header_fill = PatternFill("solid", fgColor="4472C4") header_font = Font(name='Calibri', bold=True, size=11, color="FFFFFF") data_font = Font(name='Calibri', size=11) date_font_link = Font(name='Calibri', size=11, color="000000", underline='single') yes_fill = PatternFill("solid", fgColor="E2EFDA") no_fill = PatternFill("solid", fgColor="FFE7E7") sum_header_font = Font(name='Calibri', bold=True, size=11, color="000000") sum_total_font = Font(name='Calibri', bold=True, size=11) zero_font = Font(name='Calibri', size=11, color="BFBFBF") zero_red_font = Font(name='Calibri', size=11, color="C00000") dark_blue_fill = PatternFill("solid", fgColor="203764") orange_fill = PatternFill("solid", fgColor="FFF2CC") green_fill = PatternFill("solid", fgColor="E2EFDA") total_fill = PatternFill("solid", fgColor="D9E1F2") exp_fill = PatternFill("solid", fgColor="FFE7E7") ok_fill = PatternFill("solid", fgColor="E2EFDA") # ── List: Přehled vzorků ────────────────────────────────────────────────────── analysis_ws = out_wb.create_sheet("Přehled vzorků") columns = [ ("Investigator Name", 24), ("Číslo pacienta", 20), ("Máme biopsii SM11", 20), ("Máme RNA", 16), ("Máme Cryostor", 16), ("DNA", 14), ("PLASMPK I-0 TROUGH", 18), ("PLASMA PK I-0 PEAK", 18), ("SERUM ADA I-0 PRE", 18), ("SM06/SERUM BIOM", 16), ("SM07/WB RNA", 14), ("SM10/FECAL", 14), ("PLASMPK I-2 TROUGH", 18), ("PLASMA PK I-2 PEAK", 18), ("SERUM ADA I-2 PRE", 18), ("STOOL I-2", 12), ("PLASMPK I-4 TROUGH", 18), ("PLASMA PK I-4 PEAK", 18), ("SERUM ADA I-4 PRE", 18), ("SM06/SERUM BIOM", 16), ("SM07/WB RNA", 14), ("STOOL I-4", 12), ] group_font = Font(name='Calibri', bold=True, size=11) group_fill = PatternFill("solid", fgColor="FFFFFF") group_border = Border(left=thin, right=thin, top=thin, bottom=thin) groups = [ (3, 5, "SCREENING"), (7, 12, "RANDOMIZACE I-0"), (13, 16, "I-2"), (17, 22, "I-4"), ] for start_col, end_col, label in groups: analysis_ws.merge_cells(start_row=1, start_column=start_col, end_row=1, end_column=end_col) cell = analysis_ws.cell(row=1, column=start_col, value=label) cell.font = group_font cell.fill = group_fill cell.alignment = Alignment(horizontal='center', vertical='center') cell.border = group_border for c in range(start_col, end_col + 1): analysis_ws.cell(row=1, column=c).border = group_border analysis_ws.row_dimensions[1].height = 20 for col_idx, (hdr, width) in enumerate(columns, 1): cell = analysis_ws.cell(row=2, column=col_idx, value=hdr) cell.font = header_font cell.fill = header_fill cell.border = border cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) analysis_ws.column_dimensions[get_column_letter(col_idx)].width = width analysis_ws.row_dimensions[2].height = 30 analysis_ws.freeze_panes = "C3" src_sheet_name = out_wb.sheetnames[0] pat_sheet_name = "Seznam pacientů" _csv_df_pre = pd.read_csv(csv_file, encoding='utf-8') _pat_pre = _csv_df_pre[['SiteNumber', 'Subject', 'Field4Value']].copy() _pat_pre['Field4Value'] = _pat_pre['Field4Value'].apply(lambda v: datetime.strptime(str(v).strip(), '%d %b %Y') if pd.notna(v) else None) _pat_pre = _pat_pre.sort_values(['SiteNumber', 'Subject', 'Field4Value']).reset_index(drop=True) patient_row_map = {} for i, row in _pat_pre.iterrows(): pat = row['Subject'] if pat not in patient_row_map: patient_row_map[pat] = i + 2 bxscr_patients = sorted(bxscr['Patient No.'].dropna().unique()) for row_idx, patient in enumerate(bxscr_patients, 3): investigator = bxscr[bxscr['Patient No.'] == patient].iloc[0]['Investigator Name'] sm11, sm11_row = get_specimen_info(bxscr, patient, 'Tissue , Paraffin Block') rna, rna_row = get_specimen_info(bxscr, patient, 'Biopsy RNA Later') cryo, cryo_row = get_specimen_info(bxscr, patient, 'Biopsy, Frozen Tissue') dna_date, dna_row = get_specimen_info(dna, patient) trough, trough_row = get_label_info(patient, 'PLASMPK I-0 TROUGH', 'I-0') peak, peak_row = get_label_info(patient, 'PLASMA PK I-0 PEAK', 'I-0') ada, ada_row = get_label_info(patient, 'SERUM ADA I-0 PRE', 'I-0') sm06, sm06_row = get_label_info(patient, 'SM06/SERUM BIOM', 'I-0') sm07, sm07_row = get_label_info(patient, 'SM07/WB RNA', 'I-0') sm10, sm10_row = get_label_info(patient, 'SM10/FECAL', 'I-0') trough2, trough2_row = get_label_info(patient, 'PLASMPK I-2 TROUGH', 'I-2') peak2, peak2_row = get_label_info(patient, 'PLASMA PK I-2 PEAK', 'I-2') ada2, ada2_row = get_label_info(patient, 'SERUM ADA I-2 PRE', 'I-2') stool2, stool2_row = get_label_info(patient, 'STOOL I-2', 'I-2') trough4, trough4_row = get_label_info(patient, 'PLASMPK I-4 TROUGH', 'I-4') peak4, peak4_row = get_label_info(patient, 'PLASMA PK I-4 PEAK', 'I-4') ada4, ada4_row = get_label_info(patient, 'SERUM ADA I-4 PRE', 'I-4') sm064, sm064_row = get_label_info(patient, 'SM06/SERUM BIOM', 'I-4') sm074, sm074_row = get_label_info(patient, 'SM07/WB RNA', 'I-4') stool4, stool4_row = get_label_info(patient, 'STOOL I-4', 'I-4') row_data = [investigator, patient, (sm11, sm11_row), (rna, rna_row), (cryo, cryo_row), (dna_date, dna_row), (trough, trough_row), (peak, peak_row), (ada, ada_row), (sm06, sm06_row), (sm07, sm07_row), (sm10, sm10_row), (trough2, trough2_row), (peak2, peak2_row), (ada2, ada2_row), (stool2, stool2_row), (trough4, trough4_row), (peak4, peak4_row), (ada4, ada4_row), (sm064, sm064_row), (sm074, sm074_row), (stool4, stool4_row)] for col_idx, value in enumerate(row_data, 1): if col_idx <= 2: cell = analysis_ws.cell(row=row_idx, column=col_idx, value=value) if col_idx == 2 and patient in patient_row_map: cell.hyperlink = f"#'{pat_sheet_name}'!B{patient_row_map[patient]}" cell.font = Font(name='Calibri', size=11, underline='single') else: cell.font = data_font else: dt, excel_row = value cell = analysis_ws.cell(row=row_idx, column=col_idx, value=dt) if dt and excel_row is not None: cell.hyperlink = f"#'{src_sheet_name}'!A{excel_row}" cell.font = date_font_link cell.fill = yes_fill cell.number_format = 'DD-MMM-YYYY' else: cell.font = Font(name='Calibri', size=11, color="C00000") cell.fill = no_fill cell.border = border cell.alignment = Alignment(horizontal='center', vertical='center') # ── List: Seznam pacientů ───────────────────────────────────────────────────── csv_df = pd.read_csv(csv_file, encoding='utf-8') patients_ws = out_wb.create_sheet("Seznam pacientů") pat_columns = [ ("Číslo centra", 20), ("Číslo pacienta", 20), ("Kód návštěvy", 20), ("Datum návštěvy", 16), ("Typ návštěvy", 16), ] for col_idx, (col_name, width) in enumerate(pat_columns, 1): cell = patients_ws.cell(row=1, column=col_idx, value=col_name) cell.font = header_font cell.fill = header_fill cell.border = border cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) patients_ws.column_dimensions[get_column_letter(col_idx)].width = width patients_ws.row_dimensions[1].height = 30 patients_ws.freeze_panes = "A2" def parse_date_edcstd(val): if pd.isna(val) or str(val).strip() == '': return None try: return datetime.strptime(str(val).strip(), '%d %b %Y') except: return None pat_df = csv_df[['SiteNumber', 'Subject', 'InstanceName', 'Field4Value', 'Field5Value']].copy() pat_df['Field4Value'] = pat_df['Field4Value'].apply(parse_date_edcstd) pat_df = pat_df.sort_values(['SiteNumber', 'Subject', 'Field4Value']).reset_index(drop=True) for row_idx, row in enumerate(pat_df.itertuples(index=False), 2): for col_idx, value in enumerate(row, 1): cell = patients_ws.cell(row=row_idx, column=col_idx, value=value) cell.font = data_font cell.border = border cell.alignment = Alignment(horizontal='center', vertical='center') if col_idx == 4 and value is not None: cell.number_format = 'DD-MMM-YYYY' # ── Kit inventory — načtení a příprava dat ──────────────────────────────────── kit_df_raw = pd.read_csv(kit_csv_file, encoding="utf-8") cze = kit_df_raw[kit_df_raw["Country"] == "CZE"].copy() def parse_kit_date(val): if pd.isna(val): return None try: return datetime.strptime(str(val).strip(), "%b %d, %Y") except: return None cze["Shipped Date"] = cze["Shipped Date"].apply(parse_kit_date) cze["Expiration Date"] = cze["Expiration Date"].apply(parse_kit_date) cze = cze.sort_values(["Site", "Kit Type", "Expiration Date"]).reset_index(drop=True) today_dt = datetime.combine(date.today(), datetime.min.time()) def bucket(exp_date): if exp_date is None: return None return "soon" if (exp_date - today_dt).days <= 30 else "ok" cze["_bucket"] = cze["Expiration Date"].apply(bucket) kit_order = sorted(cze["Kit Type"].unique(), key=lambda x: (str(x).lstrip("T-").zfill(5), str(x))) kit_desc = cze.drop_duplicates("Kit Type").set_index("Kit Type")["Description"].to_dict() kit_sites = sorted(cze["Site"].unique()) # ── Pomocná funkce pro souhrnné tabulky ─────────────────────────────────────── def write_summary_table(ws, current_row, title, rows_data, col_a_header): for c in range(1, 5): cell = ws.cell(row=current_row, column=c) cell.fill = dark_blue_fill cell.border = border ws.cell(row=current_row, column=1, value=title).font = Font(name='Calibri', bold=True, size=12, color="FFFFFF") ws.cell(row=current_row, column=1).alignment = Alignment(horizontal="left", vertical="center") ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=4) ws.row_dimensions[current_row].height = 22 current_row += 1 for col_idx, (h, f) in enumerate(zip( [col_a_header, "Description", "Expiruje do 30 dní", "Expiruje později"], [header_fill, header_fill, orange_fill, green_fill] ), 1): cell = ws.cell(row=current_row, column=col_idx, value=h) cell.font = sum_header_font cell.fill = f cell.border = border cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) ws.row_dimensions[current_row].height = 28 current_row += 1 totals = [0, 0] for col_a, col_b, n_soon, n_ok in rows_data: totals[0] += n_soon totals[1] += n_ok all_zero = (n_soon == 0 and n_ok == 0) row_vals = [col_a, col_b, n_soon, n_ok] row_fills = [None, None, orange_fill if n_soon > 0 else None, green_fill if n_ok > 0 else None] for col_idx, (val, rfill) in enumerate(zip(row_vals, row_fills), 1): cell = ws.cell(row=current_row, column=col_idx, value=val) if col_idx >= 3 and val == 0: cell.font = zero_red_font if all_zero else zero_font else: cell.font = data_font cell.border = border cell.alignment = Alignment(horizontal="center" if col_idx >= 2 else "left", vertical="center") if rfill: cell.fill = rfill current_row += 1 for col_idx, val in enumerate(["CELKEM", "", totals[0], totals[1]], 1): cell = ws.cell(row=current_row, column=col_idx, value=val) cell.font = sum_total_font cell.fill = total_fill cell.border = border cell.alignment = Alignment(horizontal="center" if col_idx >= 2 else "left", vertical="center") current_row += 2 return current_row # ── List: Kit Inventory CZE ─────────────────────────────────────────────────── kit_ws = out_wb.create_sheet("Kit Inventory CZE") listing_columns = [ ("Project No.", 14), ("Region", 10), ("Country", 10), ("Site", 38), ("Kit Type", 12), ("Description", 22), ("Accession", 18), ("Shipped Date", 16), ("Expiration Date", 16), ("Days to Expiration", 20), ] for col_idx, (hdr, width) in enumerate(listing_columns, 1): cell = kit_ws.cell(row=1, column=col_idx, value=hdr) cell.font = header_font cell.fill = header_fill cell.border = border cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) kit_ws.column_dimensions[get_column_letter(col_idx)].width = width kit_ws.row_dimensions[1].height = 30 kit_ws.freeze_panes = "A2" for row_idx, row in enumerate(cze.itertuples(index=False), 2): days = row[9] for col_idx, (col_name, _) in enumerate(listing_columns, 1): value = row[col_idx - 1] cell = kit_ws.cell(row=row_idx, column=col_idx, value=value) cell.font = data_font cell.border = border cell.alignment = Alignment(horizontal="center", vertical="center") if col_name in ("Shipped Date", "Expiration Date") and value is not None: cell.number_format = "DD-MMM-YYYY" if col_name == "Days to Expiration": cell.fill = exp_fill if (pd.notna(days) and days <= 60) else ok_fill kit_ws.auto_filter.ref = f"A1:{get_column_letter(len(listing_columns))}1" # ── List: Přehled po centrech ───────────────────────────────────────────────── ctr_ws = out_wb.create_sheet("Přehled po centrech") ctr_ws.column_dimensions["A"].width = 22 ctr_ws.column_dimensions["B"].width = 24 ctr_ws.column_dimensions["C"].width = 22 ctr_ws.column_dimensions["D"].width = 20 current_row = 1 for site in kit_sites: site_df = cze[cze["Site"] == site] rows_data = [] for kit in kit_order: desc = kit_desc.get(kit, "") kit_site_df = site_df[site_df["Kit Type"] == kit] n_soon = (kit_site_df["_bucket"] == "soon").sum() n_ok = (kit_site_df["_bucket"] == "ok").sum() rows_data.append((f"{kit} — {desc}", desc, n_soon, n_ok)) current_row = write_summary_table(ctr_ws, current_row, site, rows_data, "Kit Type") # ── List: Přehled po typech kitů ────────────────────────────────────────────── sum_ws = out_wb.create_sheet("Přehled po typech") sum_ws.column_dimensions["A"].width = 38 sum_ws.column_dimensions["B"].width = 22 sum_ws.column_dimensions["C"].width = 22 sum_ws.column_dimensions["D"].width = 20 current_row = 1 for kit in kit_order: desc = kit_desc.get(kit, "") kit_df = cze[cze["Kit Type"] == kit] rows_data = [] for site in sorted(kit_df["Site"].unique()): site_df = kit_df[kit_df["Site"] == site] n_soon = (site_df["_bucket"] == "soon").sum() n_ok = (site_df["_bucket"] == "ok").sum() rows_data.append((site, desc, n_soon, n_ok)) current_row = write_summary_table(sum_ws, current_row, f"Kit Type {kit} — {desc}", rows_data, "Centrum") # ── List: eQueries ─────────────────────────────────────────────────────────── eq_df = pd.read_csv(eq_csv_file, encoding="utf-8") eq_cze = eq_df[eq_df["Country"] == "CZECH REPUBLIC"].copy() status_order = {"Open": 0, "Response Received": 1, "Closed": 2} eq_cze["_status_order"] = eq_cze["Status"].map(status_order).fillna(99) eq_cze = eq_cze.sort_values(["_status_order", "Site"]).reset_index(drop=True) def parse_eq_date(val): if pd.isna(val): return None for fmt in ("%b %d, %Y %I:%M %p", "%b %d, %Y %I:%M %p"): try: return datetime.strptime(str(val).strip(), fmt) except: pass try: return datetime.strptime(str(val).strip().split(" 12:00")[0], "%b %d, %Y") except: return None eq_ws = out_wb.create_sheet("eQueries") eq_columns = [ ("Site", 36), ("Subject", 14), ("Visit", 20), ("Visit Collection Date", 20), ("Accession", 16), ("eQueryId", 12), ("Issue Type", 30), ("Status", 18), ("Create Date", 20), ("Response Date Time", 20), ("Time Before Response", 20), ("User Name", 20), ] status_fills = { "Open": PatternFill("solid", fgColor="FFE7E7"), "Response Received": PatternFill("solid", fgColor="FFF2CC"), "Closed": PatternFill("solid", fgColor="E2EFDA"), } for col_idx, (hdr, width) in enumerate(eq_columns, 1): cell = eq_ws.cell(row=1, column=col_idx, value=hdr) cell.font = header_font cell.fill = header_fill cell.border = border cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) eq_ws.column_dimensions[get_column_letter(col_idx)].width = width eq_ws.row_dimensions[1].height = 30 eq_ws.freeze_panes = "A2" for row_idx, row in enumerate(eq_cze.itertuples(index=False), 2): status = row[eq_cze.columns.get_loc("Status")] rfill = status_fills.get(status) for col_idx, (col_name, _) in enumerate(eq_columns, 1): value = row[eq_cze.columns.get_loc(col_name)] if col_name in ("Visit Collection Date", "Create Date", "Response Date Time"): value = parse_eq_date(value) cell = eq_ws.cell(row=row_idx, column=col_idx, value=value) cell.font = data_font cell.border = border cell.alignment = Alignment(horizontal="center" if col_idx > 1 else "left", vertical="center") if col_name in ("Visit Collection Date", "Create Date", "Response Date Time") and value: cell.number_format = "DD-MMM-YYYY HH:MM" if rfill: cell.fill = rfill eq_ws.auto_filter.ref = f"A1:{get_column_letter(len(eq_columns))}1" out_wb.save(out_path) print(f"Saved: {out_path}") print(f"Patients with BXSCR: {len(bxscr_patients)}, All unique patients: {len(all_patients)}") print(f"CZE kit rows: {len(cze)}, Kit types: {len(kit_order)}, Sites: {len(kit_sites)}") print(f"CZE eQueries: {len(eq_cze)} (Open: {(eq_cze['Status']=='Open').sum()}, Response Received: {(eq_cze['Status']=='Response Received').sum()}, Closed: {(eq_cze['Status']=='Closed').sum()})")