Files
janssen/IWRS/import_patients_mongo.py
2026-06-10 11:59:03 +02:00

262 lines
11 KiB
Python

"""
import_patients_mongo.py — import Patients dat (subject_summary, visits) z XLSX do MongoDB.
Dříve Patients/import_to_mongo.py. Volá se z import_patients.py
(orchestrátor IWRS/run_all). Hlavní kolekce + snapshoty.
"""
import os
import re
import sys
import glob
import datetime
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common.mongo_writer import (
to_str, to_int, to_float, to_date,
ensure_indexes, log_import, bulk_upsert_with_snapshot,
)
# ── XLSX parsery ─────────────────────────────────────────────────────────────
def read_summary_df(path):
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError(f"Hlavickovy radek nenalezen v {path}")
return pd.read_excel(path, header=header_row).dropna(how="all")
def parse_detail_visits(path):
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
header_row = None
for i, row in df.iterrows():
if "Visit Type" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
visits_df = df.iloc[header_row + 1:].copy()
visits_df.columns = range(visits_df.shape[1])
rows = []
for _, r in visits_df.iterrows():
visit_type = to_str(r.get(0))
if visit_type not in ("Past", "Upcoming"):
continue
rows.append({
"visit_type": visit_type,
"scheduled_date": to_date(r.get(1)),
"window_days": to_str(r.get(2)),
"actual_date": to_date(r.get(3)),
"irt_transaction_no": to_int(r.get(4)),
"irt_transaction_description": to_str(r.get(5)),
"medication_assignment": to_str(r.get(6)),
"quantity_assigned": to_int(r.get(7)),
"medication_id": to_str(r.get(8)),
})
return rows
# ── transformace summary řádků na Mongo dokumenty ─────────────────────────────
# Společné sloupce (mají obě studie pod stejným XLSX názvem)
COMMON_FIELDS = {
"subject": "Subject",
"prior_subject_identifier": "Prior Subject Identifier",
"site": "Site",
"investigator": "Investigator",
"location": "Location",
"cohort_per_irt": "Cohort per IRT",
"informed_consent_date": "Informed Consent Date",
"age": "Subject's age collection",
"irt_subject_status": "IRT Subject Status",
"last_irt_transaction": "Last Recorded IRT Transaction",
"last_irt_transaction_date_local": "Last Recorded IRT Transaction Date [Local]",
"last_irt_transaction_date_utc": "Last Recorded IRT Transaction Date (UTC)",
"next_irt_transaction": "Next Expected IRT Transaction",
"next_irt_transaction_date_local": "Next Expected IRT Transaction Date [Local]",
}
# UCO3001-specifická pole
UCO_FIELDS = {
"adolescent_assent_date": ("Adolescent Assent Date", to_date),
"weight": ("Subject's weight collection", to_float),
"rescreened_subject": ("Rescreened Subject", to_str),
"adt_ir": ("ADT-IR", to_str),
"three_or_more_advanced_therapies": ("3 or More Advanced Therapies", to_str),
"only_oral_5asa_compounds": ("Only Oral 5-ASA Compounds", to_str),
"ustekinumab": ("Ustekinumab", to_str),
"isolated_proctitis": ("Isolated Proctitis", to_str),
"clinical_responder_status_i12_m0": ("Clinical Responder Status at I-12 / M-0", to_str),
"i0_rand_date_local": ("I0_RAND_TIMESTAMP_LOCAL [Local]", to_date),
"most_recent_med_assignment_date": ("Most Recent Medication Assignment Transaction [Local]", to_date),
"days_since_last_med_assignment": ("Days Since Last Medication Assignment Transaction", to_int),
"patient_forecast_status": ("Patient Forecast Status", to_str),
"patient_forecast_status_changed_date":("Patient Forecast Status Changed Date (UTC)", to_date),
}
# MDD3003-specifická pole
MDD_FIELDS = {
"madrs_criteria_integrated": ("MADRS response criteria integrated or manually entered", to_str),
"madrs_criteria_v15": ("MADRS response criteria v1.5 from RAVE", to_str),
"madrs_criteria_v16": ("MADRS response criteria v1.6 from RAVE", to_str),
"madrs_criteria_v17": ("MADRS response criteria v1.7 from RAVE", to_str),
"stratification_country": ("Stratification Country", to_str),
"age_group": ("Age Group", to_str),
"stable_remitters": ("Stable Remitters vs. Non Stable Remitters", to_str),
"date_screened": ("Date Screened [Local]", to_date),
"date_screen_failed": ("Date Screen Failed [Local]", to_date),
"date_randomized_part1": ("Date Randomized Part 1 [Local]", to_date),
"date_early_withdraw_randomized_part1": ("Date Early Withdraw Randomized Part 1 [Local]", to_date),
"date_open_label_induction": ("Date Open Label Induction [Local]", to_date),
"date_early_withdraw_open_label_induction":("Date Early Withdraw Open Label Induction [Local]", to_date),
"date_randomized_part2": ("Date Randomized Part 2 [Local]", to_date),
"date_early_withdraw_randomized_part2": ("Date Early Withdraw Randomized Part 2 [Local]", to_date),
"date_completed": ("Date Completed [Local]", to_date),
"date_unblinded": ("Date Unblinded [Local]", to_date),
}
def row_to_summary_doc(study, r, cols):
doc = {"study": study}
# společná pole
for mongo_key, xlsx_key in COMMON_FIELDS.items():
if xlsx_key not in cols:
continue
v = r[xlsx_key]
if mongo_key.endswith("_date") or "date_local" in mongo_key or "date_utc" in mongo_key:
doc[mongo_key] = to_date(v)
elif mongo_key == "age":
doc[mongo_key] = to_int(v)
else:
doc[mongo_key] = to_str(v)
# study-specifická
specific = UCO_FIELDS if study == "77242113UCO3001" else MDD_FIELDS
for mongo_key, (xlsx_key, fn) in specific.items():
if xlsx_key in cols:
doc[mongo_key] = fn(r[xlsx_key])
subject = doc.get("subject")
if not subject:
return None
doc["_id"] = f"{study}:{subject}"
return doc
def visit_to_doc(study, subject, v):
"""visit row + study/subject → Mongo dokument s deterministickým _id."""
# _id: study:subject:irt_no nebo scheduled_date (pokud chybí transaction_no, často Upcoming)
key = v["irt_transaction_no"] if v["irt_transaction_no"] is not None else (
v["scheduled_date"].strftime("%Y%m%d") if v["scheduled_date"] else "noidx"
)
# další rozlišení (víc řádků na stejný visit = unique by description)
desc_key = (v.get("irt_transaction_description") or "").replace(" ", "_")[:30]
doc = {
"_id": f"{study}:{subject}:{key}:{desc_key}",
"study": study,
"subject": subject,
**v,
}
return doc
# ── hlavní importy ───────────────────────────────────────────────────────────
def import_subject_summary(study, summary_path):
print(f" [{study}] subject_summary: {os.path.basename(summary_path)}")
df = read_summary_df(summary_path)
cols = df.columns.tolist()
docs = []
for _, r in df.iterrows():
d = row_to_summary_doc(study, r, cols)
if d:
docs.append(d)
import_id = log_import(study, os.path.basename(summary_path), "subject_summary", {"subjects": len(docs)})
n = bulk_upsert_with_snapshot(
"iwrs_subject_summary", "iwrs_subject_summary_snapshots",
docs, import_id,
)
print(f" import_id={import_id} subjektu={n}")
return import_id
DETAIL_FILENAME_RE = re.compile(r"(\d{4}-\d{2}-\d{2}) (\S+) (\S+) Subject Detail\.xlsx")
def parse_detail_filename(path):
"""Vrátí (date, study, subject) nebo None."""
m = DETAIL_FILENAME_RE.search(os.path.basename(path))
if not m:
return None
return m.group(1), m.group(2), m.group(3)
def import_visits(study, detail_files):
print(f" [{study}] visits z {len(detail_files)} detail souboru")
docs = []
for path in detail_files:
parsed = parse_detail_filename(path)
subject = parsed[2] if parsed else "UNKNOWN"
visits = parse_detail_visits(path)
for v in visits:
docs.append(visit_to_doc(study, subject, v))
# dedupe podle _id (víc řádků se stejným klíčem → bereme posledni)
by_id = {d["_id"]: d for d in docs}
docs = list(by_id.values())
import_id = log_import(study, "detail_reports", "visits", {"visits": len(docs)})
n = bulk_upsert_with_snapshot(
"iwrs_visits", "iwrs_visits_snapshots",
docs, import_id,
)
print(f" import_id={import_id} vizit={n}")
return import_id
def import_visits_single_file(study, subject, path):
"""Naimportuje vizity z JEDNOHO detail souboru (pro chronologické snapshoty)."""
visits = parse_detail_visits(path)
docs = []
seen = set()
for v in visits:
d = visit_to_doc(study, subject, v)
if d["_id"] in seen:
continue
seen.add(d["_id"])
docs.append(d)
import_id = log_import(study, os.path.basename(path), "visits", {"visits": len(docs), "subject": subject})
n = bulk_upsert_with_snapshot(
"iwrs_visits", "iwrs_visits_snapshots",
docs, import_id,
)
print(f" [{study}/{subject}] {os.path.basename(path)} import_id={import_id} vizit={n}")
return import_id
# ── main entry (volá se z run_all.py) ────────────────────────────────────────
def run(study, summary_path, details_dir, today):
ensure_indexes()
import_subject_summary(study, summary_path)
detail_files = sorted(glob.glob(
os.path.join(details_dir, study, f"{today} {study} * Subject Detail.xlsx")
))
import_visits(study, detail_files)
if __name__ == "__main__":
# CLI: python import_to_mongo.py <study> <summary_path> <details_dir>
if len(sys.argv) >= 4:
run(sys.argv[1], sys.argv[2], sys.argv[3],
datetime.date.today().strftime("%Y-%m-%d"))
else:
print("Pouziti: python import_to_mongo.py <study> <summary.xlsx> <details_dir>")