262 lines
11 KiB
Python
262 lines
11 KiB
Python
"""
|
|
import_patients_mongo.py — import Patients dat (subject_summary, visits) z XLSX do MongoDB.
|
|
|
|
Dříve Patients/import_to_mongo.py. Volá se z import_patients.py
|
|
(orchestrátor IWRS/run_all). Hlavní kolekce + snapshoty.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import glob
|
|
import datetime
|
|
|
|
import pandas as pd
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from common.mongo_writer import (
|
|
to_str, to_int, to_float, to_date,
|
|
ensure_indexes, log_import, bulk_upsert_with_snapshot,
|
|
)
|
|
|
|
|
|
# ── XLSX parsery ─────────────────────────────────────────────────────────────
|
|
|
|
def read_summary_df(path):
|
|
raw = pd.read_excel(path, header=None)
|
|
header_row = None
|
|
for i, row in raw.iterrows():
|
|
if "Subject" in [str(v).strip() for v in row]:
|
|
header_row = i
|
|
break
|
|
if header_row is None:
|
|
raise ValueError(f"Hlavickovy radek nenalezen v {path}")
|
|
return pd.read_excel(path, header=header_row).dropna(how="all")
|
|
|
|
|
|
def parse_detail_visits(path):
|
|
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
|
|
header_row = None
|
|
for i, row in df.iterrows():
|
|
if "Visit Type" in [str(v).strip() for v in row]:
|
|
header_row = i
|
|
break
|
|
if header_row is None:
|
|
return []
|
|
visits_df = df.iloc[header_row + 1:].copy()
|
|
visits_df.columns = range(visits_df.shape[1])
|
|
rows = []
|
|
for _, r in visits_df.iterrows():
|
|
visit_type = to_str(r.get(0))
|
|
if visit_type not in ("Past", "Upcoming"):
|
|
continue
|
|
rows.append({
|
|
"visit_type": visit_type,
|
|
"scheduled_date": to_date(r.get(1)),
|
|
"window_days": to_str(r.get(2)),
|
|
"actual_date": to_date(r.get(3)),
|
|
"irt_transaction_no": to_int(r.get(4)),
|
|
"irt_transaction_description": to_str(r.get(5)),
|
|
"medication_assignment": to_str(r.get(6)),
|
|
"quantity_assigned": to_int(r.get(7)),
|
|
"medication_id": to_str(r.get(8)),
|
|
})
|
|
return rows
|
|
|
|
|
|
# ── transformace summary řádků na Mongo dokumenty ─────────────────────────────
|
|
|
|
# Společné sloupce (mají obě studie pod stejným XLSX názvem)
|
|
COMMON_FIELDS = {
|
|
"subject": "Subject",
|
|
"prior_subject_identifier": "Prior Subject Identifier",
|
|
"site": "Site",
|
|
"investigator": "Investigator",
|
|
"location": "Location",
|
|
"cohort_per_irt": "Cohort per IRT",
|
|
"informed_consent_date": "Informed Consent Date",
|
|
"age": "Subject's age collection",
|
|
"irt_subject_status": "IRT Subject Status",
|
|
"last_irt_transaction": "Last Recorded IRT Transaction",
|
|
"last_irt_transaction_date_local": "Last Recorded IRT Transaction Date [Local]",
|
|
"last_irt_transaction_date_utc": "Last Recorded IRT Transaction Date (UTC)",
|
|
"next_irt_transaction": "Next Expected IRT Transaction",
|
|
"next_irt_transaction_date_local": "Next Expected IRT Transaction Date [Local]",
|
|
}
|
|
|
|
# UCO3001-specifická pole
|
|
UCO_FIELDS = {
|
|
"adolescent_assent_date": ("Adolescent Assent Date", to_date),
|
|
"weight": ("Subject's weight collection", to_float),
|
|
"rescreened_subject": ("Rescreened Subject", to_str),
|
|
"adt_ir": ("ADT-IR", to_str),
|
|
"three_or_more_advanced_therapies": ("3 or More Advanced Therapies", to_str),
|
|
"only_oral_5asa_compounds": ("Only Oral 5-ASA Compounds", to_str),
|
|
"ustekinumab": ("Ustekinumab", to_str),
|
|
"isolated_proctitis": ("Isolated Proctitis", to_str),
|
|
"clinical_responder_status_i12_m0": ("Clinical Responder Status at I-12 / M-0", to_str),
|
|
"i0_rand_date_local": ("I0_RAND_TIMESTAMP_LOCAL [Local]", to_date),
|
|
"most_recent_med_assignment_date": ("Most Recent Medication Assignment Transaction [Local]", to_date),
|
|
"days_since_last_med_assignment": ("Days Since Last Medication Assignment Transaction", to_int),
|
|
"patient_forecast_status": ("Patient Forecast Status", to_str),
|
|
"patient_forecast_status_changed_date":("Patient Forecast Status Changed Date (UTC)", to_date),
|
|
}
|
|
|
|
# MDD3003-specifická pole
|
|
MDD_FIELDS = {
|
|
"madrs_criteria_integrated": ("MADRS response criteria integrated or manually entered", to_str),
|
|
"madrs_criteria_v15": ("MADRS response criteria v1.5 from RAVE", to_str),
|
|
"madrs_criteria_v16": ("MADRS response criteria v1.6 from RAVE", to_str),
|
|
"madrs_criteria_v17": ("MADRS response criteria v1.7 from RAVE", to_str),
|
|
"stratification_country": ("Stratification Country", to_str),
|
|
"age_group": ("Age Group", to_str),
|
|
"stable_remitters": ("Stable Remitters vs. Non Stable Remitters", to_str),
|
|
"date_screened": ("Date Screened [Local]", to_date),
|
|
"date_screen_failed": ("Date Screen Failed [Local]", to_date),
|
|
"date_randomized_part1": ("Date Randomized Part 1 [Local]", to_date),
|
|
"date_early_withdraw_randomized_part1": ("Date Early Withdraw Randomized Part 1 [Local]", to_date),
|
|
"date_open_label_induction": ("Date Open Label Induction [Local]", to_date),
|
|
"date_early_withdraw_open_label_induction":("Date Early Withdraw Open Label Induction [Local]", to_date),
|
|
"date_randomized_part2": ("Date Randomized Part 2 [Local]", to_date),
|
|
"date_early_withdraw_randomized_part2": ("Date Early Withdraw Randomized Part 2 [Local]", to_date),
|
|
"date_completed": ("Date Completed [Local]", to_date),
|
|
"date_unblinded": ("Date Unblinded [Local]", to_date),
|
|
}
|
|
|
|
|
|
def row_to_summary_doc(study, r, cols):
|
|
doc = {"study": study}
|
|
# společná pole
|
|
for mongo_key, xlsx_key in COMMON_FIELDS.items():
|
|
if xlsx_key not in cols:
|
|
continue
|
|
v = r[xlsx_key]
|
|
if mongo_key.endswith("_date") or "date_local" in mongo_key or "date_utc" in mongo_key:
|
|
doc[mongo_key] = to_date(v)
|
|
elif mongo_key == "age":
|
|
doc[mongo_key] = to_int(v)
|
|
else:
|
|
doc[mongo_key] = to_str(v)
|
|
|
|
# study-specifická
|
|
specific = UCO_FIELDS if study == "77242113UCO3001" else MDD_FIELDS
|
|
for mongo_key, (xlsx_key, fn) in specific.items():
|
|
if xlsx_key in cols:
|
|
doc[mongo_key] = fn(r[xlsx_key])
|
|
|
|
subject = doc.get("subject")
|
|
if not subject:
|
|
return None
|
|
doc["_id"] = f"{study}:{subject}"
|
|
return doc
|
|
|
|
|
|
def visit_to_doc(study, subject, v):
|
|
"""visit row + study/subject → Mongo dokument s deterministickým _id."""
|
|
# _id: study:subject:irt_no nebo scheduled_date (pokud chybí transaction_no, často Upcoming)
|
|
key = v["irt_transaction_no"] if v["irt_transaction_no"] is not None else (
|
|
v["scheduled_date"].strftime("%Y%m%d") if v["scheduled_date"] else "noidx"
|
|
)
|
|
# další rozlišení (víc řádků na stejný visit = unique by description)
|
|
desc_key = (v.get("irt_transaction_description") or "").replace(" ", "_")[:30]
|
|
doc = {
|
|
"_id": f"{study}:{subject}:{key}:{desc_key}",
|
|
"study": study,
|
|
"subject": subject,
|
|
**v,
|
|
}
|
|
return doc
|
|
|
|
|
|
# ── hlavní importy ───────────────────────────────────────────────────────────
|
|
|
|
def import_subject_summary(study, summary_path):
|
|
print(f" [{study}] subject_summary: {os.path.basename(summary_path)}")
|
|
df = read_summary_df(summary_path)
|
|
cols = df.columns.tolist()
|
|
docs = []
|
|
for _, r in df.iterrows():
|
|
d = row_to_summary_doc(study, r, cols)
|
|
if d:
|
|
docs.append(d)
|
|
import_id = log_import(study, os.path.basename(summary_path), "subject_summary", {"subjects": len(docs)})
|
|
n = bulk_upsert_with_snapshot(
|
|
"iwrs_subject_summary", "iwrs_subject_summary_snapshots",
|
|
docs, import_id,
|
|
)
|
|
print(f" import_id={import_id} subjektu={n}")
|
|
return import_id
|
|
|
|
|
|
DETAIL_FILENAME_RE = re.compile(r"(\d{4}-\d{2}-\d{2}) (\S+) (\S+) Subject Detail\.xlsx")
|
|
|
|
|
|
def parse_detail_filename(path):
|
|
"""Vrátí (date, study, subject) nebo None."""
|
|
m = DETAIL_FILENAME_RE.search(os.path.basename(path))
|
|
if not m:
|
|
return None
|
|
return m.group(1), m.group(2), m.group(3)
|
|
|
|
|
|
def import_visits(study, detail_files):
|
|
print(f" [{study}] visits z {len(detail_files)} detail souboru")
|
|
docs = []
|
|
for path in detail_files:
|
|
parsed = parse_detail_filename(path)
|
|
subject = parsed[2] if parsed else "UNKNOWN"
|
|
visits = parse_detail_visits(path)
|
|
for v in visits:
|
|
docs.append(visit_to_doc(study, subject, v))
|
|
# dedupe podle _id (víc řádků se stejným klíčem → bereme posledni)
|
|
by_id = {d["_id"]: d for d in docs}
|
|
docs = list(by_id.values())
|
|
|
|
import_id = log_import(study, "detail_reports", "visits", {"visits": len(docs)})
|
|
n = bulk_upsert_with_snapshot(
|
|
"iwrs_visits", "iwrs_visits_snapshots",
|
|
docs, import_id,
|
|
)
|
|
print(f" import_id={import_id} vizit={n}")
|
|
return import_id
|
|
|
|
|
|
def import_visits_single_file(study, subject, path):
|
|
"""Naimportuje vizity z JEDNOHO detail souboru (pro chronologické snapshoty)."""
|
|
visits = parse_detail_visits(path)
|
|
docs = []
|
|
seen = set()
|
|
for v in visits:
|
|
d = visit_to_doc(study, subject, v)
|
|
if d["_id"] in seen:
|
|
continue
|
|
seen.add(d["_id"])
|
|
docs.append(d)
|
|
import_id = log_import(study, os.path.basename(path), "visits", {"visits": len(docs), "subject": subject})
|
|
n = bulk_upsert_with_snapshot(
|
|
"iwrs_visits", "iwrs_visits_snapshots",
|
|
docs, import_id,
|
|
)
|
|
print(f" [{study}/{subject}] {os.path.basename(path)} import_id={import_id} vizit={n}")
|
|
return import_id
|
|
|
|
|
|
# ── main entry (volá se z run_all.py) ────────────────────────────────────────
|
|
|
|
def run(study, summary_path, details_dir, today):
|
|
ensure_indexes()
|
|
import_subject_summary(study, summary_path)
|
|
detail_files = sorted(glob.glob(
|
|
os.path.join(details_dir, study, f"{today} {study} * Subject Detail.xlsx")
|
|
))
|
|
import_visits(study, detail_files)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# CLI: python import_to_mongo.py <study> <summary_path> <details_dir>
|
|
if len(sys.argv) >= 4:
|
|
run(sys.argv[1], sys.argv[2], sys.argv[3],
|
|
datetime.date.today().strftime("%Y-%m-%d"))
|
|
else:
|
|
print("Pouziti: python import_to_mongo.py <study> <summary.xlsx> <details_dir>")
|