""" import_patients_mongo.py — import Patients dat (subject_summary, visits) z XLSX do MongoDB. Dříve Patients/import_to_mongo.py. Volá se z import_patients.py (orchestrátor IWRS/run_all). Hlavní kolekce + snapshoty. """ import os import re import sys import glob import datetime import pandas as pd sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from common.mongo_writer import ( to_str, to_int, to_float, to_date, ensure_indexes, log_import, bulk_upsert_with_snapshot, ) # ── XLSX parsery ───────────────────────────────────────────────────────────── def read_summary_df(path): raw = pd.read_excel(path, header=None) header_row = None for i, row in raw.iterrows(): if "Subject" in [str(v).strip() for v in row]: header_row = i break if header_row is None: raise ValueError(f"Hlavickovy radek nenalezen v {path}") return pd.read_excel(path, header=header_row).dropna(how="all") def parse_detail_visits(path): df = pd.read_excel(path, sheet_name="patient_detail_report", header=None) header_row = None for i, row in df.iterrows(): if "Visit Type" in [str(v).strip() for v in row]: header_row = i break if header_row is None: return [] visits_df = df.iloc[header_row + 1:].copy() visits_df.columns = range(visits_df.shape[1]) rows = [] for _, r in visits_df.iterrows(): visit_type = to_str(r.get(0)) if visit_type not in ("Past", "Upcoming"): continue rows.append({ "visit_type": visit_type, "scheduled_date": to_date(r.get(1)), "window_days": to_str(r.get(2)), "actual_date": to_date(r.get(3)), "irt_transaction_no": to_int(r.get(4)), "irt_transaction_description": to_str(r.get(5)), "medication_assignment": to_str(r.get(6)), "quantity_assigned": to_int(r.get(7)), "medication_id": to_str(r.get(8)), }) return rows # ── transformace summary řádků na Mongo dokumenty ───────────────────────────── # Společné sloupce (mají obě studie pod stejným XLSX názvem) COMMON_FIELDS = { "subject": "Subject", "prior_subject_identifier": "Prior Subject Identifier", "site": "Site", "investigator": "Investigator", "location": "Location", "cohort_per_irt": "Cohort per IRT", "informed_consent_date": "Informed Consent Date", "age": "Subject's age collection", "irt_subject_status": "IRT Subject Status", "last_irt_transaction": "Last Recorded IRT Transaction", "last_irt_transaction_date_local": "Last Recorded IRT Transaction Date [Local]", "last_irt_transaction_date_utc": "Last Recorded IRT Transaction Date (UTC)", "next_irt_transaction": "Next Expected IRT Transaction", "next_irt_transaction_date_local": "Next Expected IRT Transaction Date [Local]", } # UCO3001-specifická pole UCO_FIELDS = { "adolescent_assent_date": ("Adolescent Assent Date", to_date), "weight": ("Subject's weight collection", to_float), "rescreened_subject": ("Rescreened Subject", to_str), "adt_ir": ("ADT-IR", to_str), "three_or_more_advanced_therapies": ("3 or More Advanced Therapies", to_str), "only_oral_5asa_compounds": ("Only Oral 5-ASA Compounds", to_str), "ustekinumab": ("Ustekinumab", to_str), "isolated_proctitis": ("Isolated Proctitis", to_str), "clinical_responder_status_i12_m0": ("Clinical Responder Status at I-12 / M-0", to_str), "i0_rand_date_local": ("I0_RAND_TIMESTAMP_LOCAL [Local]", to_date), "most_recent_med_assignment_date": ("Most Recent Medication Assignment Transaction [Local]", to_date), "days_since_last_med_assignment": ("Days Since Last Medication Assignment Transaction", to_int), "patient_forecast_status": ("Patient Forecast Status", to_str), "patient_forecast_status_changed_date":("Patient Forecast Status Changed Date (UTC)", to_date), } # MDD3003-specifická pole MDD_FIELDS = { "madrs_criteria_integrated": ("MADRS response criteria integrated or manually entered", to_str), "madrs_criteria_v15": ("MADRS response criteria v1.5 from RAVE", to_str), "madrs_criteria_v16": ("MADRS response criteria v1.6 from RAVE", to_str), "madrs_criteria_v17": ("MADRS response criteria v1.7 from RAVE", to_str), "stratification_country": ("Stratification Country", to_str), "age_group": ("Age Group", to_str), "stable_remitters": ("Stable Remitters vs. Non Stable Remitters", to_str), "date_screened": ("Date Screened [Local]", to_date), "date_screen_failed": ("Date Screen Failed [Local]", to_date), "date_randomized_part1": ("Date Randomized Part 1 [Local]", to_date), "date_early_withdraw_randomized_part1": ("Date Early Withdraw Randomized Part 1 [Local]", to_date), "date_open_label_induction": ("Date Open Label Induction [Local]", to_date), "date_early_withdraw_open_label_induction":("Date Early Withdraw Open Label Induction [Local]", to_date), "date_randomized_part2": ("Date Randomized Part 2 [Local]", to_date), "date_early_withdraw_randomized_part2": ("Date Early Withdraw Randomized Part 2 [Local]", to_date), "date_completed": ("Date Completed [Local]", to_date), "date_unblinded": ("Date Unblinded [Local]", to_date), } def row_to_summary_doc(study, r, cols): doc = {"study": study} # společná pole for mongo_key, xlsx_key in COMMON_FIELDS.items(): if xlsx_key not in cols: continue v = r[xlsx_key] if mongo_key.endswith("_date") or "date_local" in mongo_key or "date_utc" in mongo_key: doc[mongo_key] = to_date(v) elif mongo_key == "age": doc[mongo_key] = to_int(v) else: doc[mongo_key] = to_str(v) # study-specifická specific = UCO_FIELDS if study == "77242113UCO3001" else MDD_FIELDS for mongo_key, (xlsx_key, fn) in specific.items(): if xlsx_key in cols: doc[mongo_key] = fn(r[xlsx_key]) subject = doc.get("subject") if not subject: return None doc["_id"] = f"{study}:{subject}" return doc def visit_to_doc(study, subject, v): """visit row + study/subject → Mongo dokument s deterministickým _id.""" # _id: study:subject:irt_no nebo scheduled_date (pokud chybí transaction_no, často Upcoming) key = v["irt_transaction_no"] if v["irt_transaction_no"] is not None else ( v["scheduled_date"].strftime("%Y%m%d") if v["scheduled_date"] else "noidx" ) # další rozlišení (víc řádků na stejný visit = unique by description) desc_key = (v.get("irt_transaction_description") or "").replace(" ", "_")[:30] doc = { "_id": f"{study}:{subject}:{key}:{desc_key}", "study": study, "subject": subject, **v, } return doc # ── hlavní importy ─────────────────────────────────────────────────────────── def import_subject_summary(study, summary_path): print(f" [{study}] subject_summary: {os.path.basename(summary_path)}") df = read_summary_df(summary_path) cols = df.columns.tolist() docs = [] for _, r in df.iterrows(): d = row_to_summary_doc(study, r, cols) if d: docs.append(d) import_id = log_import(study, os.path.basename(summary_path), "subject_summary", {"subjects": len(docs)}) n = bulk_upsert_with_snapshot( "iwrs_subject_summary", "iwrs_subject_summary_snapshots", docs, import_id, ) print(f" import_id={import_id} subjektu={n}") return import_id DETAIL_FILENAME_RE = re.compile(r"(\d{4}-\d{2}-\d{2}) (\S+) (\S+) Subject Detail\.xlsx") def parse_detail_filename(path): """Vrátí (date, study, subject) nebo None.""" m = DETAIL_FILENAME_RE.search(os.path.basename(path)) if not m: return None return m.group(1), m.group(2), m.group(3) def import_visits(study, detail_files): print(f" [{study}] visits z {len(detail_files)} detail souboru") docs = [] for path in detail_files: parsed = parse_detail_filename(path) subject = parsed[2] if parsed else "UNKNOWN" visits = parse_detail_visits(path) for v in visits: docs.append(visit_to_doc(study, subject, v)) # dedupe podle _id (víc řádků se stejným klíčem → bereme posledni) by_id = {d["_id"]: d for d in docs} docs = list(by_id.values()) import_id = log_import(study, "detail_reports", "visits", {"visits": len(docs)}) n = bulk_upsert_with_snapshot( "iwrs_visits", "iwrs_visits_snapshots", docs, import_id, ) print(f" import_id={import_id} vizit={n}") return import_id def import_visits_single_file(study, subject, path): """Naimportuje vizity z JEDNOHO detail souboru (pro chronologické snapshoty).""" visits = parse_detail_visits(path) docs = [] seen = set() for v in visits: d = visit_to_doc(study, subject, v) if d["_id"] in seen: continue seen.add(d["_id"]) docs.append(d) import_id = log_import(study, os.path.basename(path), "visits", {"visits": len(docs), "subject": subject}) n = bulk_upsert_with_snapshot( "iwrs_visits", "iwrs_visits_snapshots", docs, import_id, ) print(f" [{study}/{subject}] {os.path.basename(path)} import_id={import_id} vizit={n}") return import_id # ── main entry (volá se z run_all.py) ──────────────────────────────────────── def run(study, summary_path, details_dir, today): ensure_indexes() import_subject_summary(study, summary_path) detail_files = sorted(glob.glob( os.path.join(details_dir, study, f"{today} {study} * Subject Detail.xlsx") )) import_visits(study, detail_files) if __name__ == "__main__": # CLI: python import_to_mongo.py if len(sys.argv) >= 4: run(sys.argv[1], sys.argv[2], sys.argv[3], datetime.date.today().strftime("%Y-%m-%d")) else: print("Pouziti: python import_to_mongo.py ")