""" Import EDC CSV reportů do MongoDB. Použití: python edc_import.py report.csv python edc_import.py reports/*.csv python edc_import.py report.csv --host mongodb://192.168.1.100:27017 --db klinicka_studie """ import argparse import csv import glob import logging import os import re import sys from datetime import datetime, timezone from pathlib import Path from pymongo import MongoClient, ASCENDING from pymongo.errors import PyMongoError # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler("edc_import.log", encoding="utf-8"), logging.StreamHandler(open(sys.stdout.fileno(), mode="w", encoding="utf-8", closefd=False)), ], ) log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mapování pevných CSV sloupců na MongoDB cesty # --------------------------------------------------------------------------- FIXED_FIELDS = { "StudyName": "study", "SiteGroupName": "site.group", "SiteID": "site.id", "SiteNumber": "site.number", "Site": "site.name", "SubjectID": "subject.id", "Subject": "subject.label", "CRFVersionID": "form.crfVersionId", "InstanceID": "form.instanceId", "InstanceName": "form.instanceName", "FolderSeq": "form.folderSeq", "Page": "form.page", "RecordID": "form.recordId", "RecordPosition": "form.recordPosition", "LastModifiedDate": "lastModified", "PrintDateTime": "importedAt", } # Sloupce, které jdou do _meta (ostatní administrativní) META_FIELDS = {"RunUser", "VersionNumber", "FilterField"} # Pole, která se převedou na int INT_FIELDS = {"Elapsed days"} # Formáty datumů, které zkusíme parsovat DATE_FORMATS = [ "%d %b %Y %H:%M:%S", # 20 MAY 2026 12:06:18 "%d %b %Y %H:%M:%S:%f", # 10 Aug 2025 18:13:22:080 (EDC query dates) "%Y%m%d %H:%M:%S.%f", # 20250810 18:13:22.080 (sortable query dates) "%Y-%m-%d %H:%M:%S", # 2026-05-20 12:06:28 "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%fZ", "%d/%m/%Y %H:%M:%S", "%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p", # 5/20/2026 1:23:27 PM ] # --------------------------------------------------------------------------- # QueryDetails — detekce a mapování # --------------------------------------------------------------------------- QUERY_DETAIL_MARKER = "QueryID(ReQry)" QUERY_META_FIELDS = { "StudyParameter", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter", "SubjectParameter", "SubjectStatusParameter", "FolderParameter", "FormParameter", "FieldParameter", "MarkingGroupParameter", "QueryStatusParameter", "IncludeInactivePagesParameter", "PageSDVParameter", "PageFrozenParameter", "PageLockedParameter", "StartDateParameter", "EndDateParameter", "MilestoneParameter", "ReportTypeParameter", "VersionNumber", "TimeZone", "RunUser", "ErrorString", # Sortable dates — redundantní, parsujeme z hlavních sloupců "OpenedDateSrtble", "AnsweredDateSrtble", "ClosedDateSrtble", # Agregátní počty — jdou do meta "VisitSiteLevel", "VisitCountryLevel", "VisitStudyLevel", "PageSubjectLevel", "PageSiteLevel", "PageCountryLevel", "PageStudyLevel", "Queries (Op/Ans/SDV)", } def is_query_details(fieldnames: list[str]) -> bool: return QUERY_DETAIL_MARKER in fieldnames def map_query_row(row: dict, source_file: str) -> dict: """Přemapuje řádek QueryDetails reportu na MongoDB dokument.""" def val(col: str) -> str: return (row.get(col) or "").strip() def int_or_none(col: str): v = val(col) if v == "": return None try: return int(v) except ValueError: return v def date_or_str(col: str): v = val(col) if not v: return None parsed = parse_date(v) return parsed if parsed else v meta = {k: row[k].strip() for k in QUERY_META_FIELDS if row.get(k, "").strip()} doc = { "study": val("StudyParameter"), "site": { "group": val("Country/Region"), "number": val("Site Number"), "name": val("Sites"), }, "subject": { "label": val("Subjects"), "status": val("Subject Status"), }, "visit": val("Visits"), "page": val("Pages"), "recordPosition": int_or_none("RecordPosition"), "field": val("Field"), "queryGroup": val("Query Group"), "queryId": val(QUERY_DETAIL_MARKER), "queryStatus": val("QueryStatus"), "openedBy": val("Opened By"), "openedDate": date_or_str("Opened Date"), "answeredBy": val("Answered By") or None, "answeredDate": date_or_str("Answered Date"), "closedBy": val("Closed By") or None, "closedDate": date_or_str("Closed Date"), "daysNotYetClosed": int_or_none("DaysNotYetClosed"), "daysToAnswer": int_or_none("Days to Answer"), "daysToClose": int_or_none("Days to Close"), "queryText": val("QueryText"), "answerText": val("Answer Text (if any)") or None, "importedAt": date_or_str("PrintDateTime"), "sourceFile": source_file, "_meta": meta, } # Odstraň None hodnoty z top-level (ne z nested) return {k: v for k, v in doc.items() if v is not None or k in ("queryId",)} def ensure_query_indexes(collection) -> None: collection.create_index([("queryId", ASCENDING)], unique=True, sparse=True) collection.create_index([("subject.label", ASCENDING)]) collection.create_index([("site.number", ASCENDING)]) collection.create_index([("queryStatus", ASCENDING)]) collection.create_index([("openedDate", ASCENDING)]) def ensure_snapshot_indexes(collection) -> None: """Indexy pro queries_snapshots — unikátní kombinace queryId + snapshotDate.""" collection.create_index( [("queryId", ASCENDING), ("snapshotDate", ASCENDING)], unique=True, ) collection.create_index([("snapshotDate", ASCENDING)]) collection.create_index([("queryStatus", ASCENDING)]) collection.create_index([("site.number", ASCENDING)]) collection.create_index([("subject.label", ASCENDING)]) def extract_snapshot_date(filename: str) -> str: """ Vytáhne datum ze jména souboru. '2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv' → '2026-05-20' Fallback: dnešní datum. """ stem = Path(filename).name match = re.match(r"(\d{4}-\d{2}-\d{2})", stem) if match: return match.group(1) return datetime.now(timezone.utc).strftime("%Y-%m-%d") def parse_date(value: str) -> str | None: """Pokusí se převést string na ISO 8601; jinak vrátí None.""" value = value.strip() for fmt in DATE_FORMATS: try: dt = datetime.strptime(value, fmt) return dt.replace(tzinfo=timezone.utc).isoformat() except ValueError: continue return None def set_nested(doc: dict, path: str, value: str) -> None: """Nastaví hodnotu v nested dict podle tečkové cesty, např. 'site.id'.""" parts = path.split(".") for part in parts[:-1]: doc = doc.setdefault(part, {}) doc[parts[-1]] = value def collection_name_from_filename(filename: str) -> str: """ Odvodí název kolekce z názvu souboru. '2026-05-20_15-09_EDC_MDD3003_InterimInvestigatorSignature_DataListing.csv' → 'MDD3003_InterimInvestigatorSignature' '2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv' → 'MDD3003_QueryDetails' """ stem = Path(filename).stem # Se suffixem _DataListing match = re.search(r"EDC_(.+?)_DataListing", stem, re.IGNORECASE) if match: return match.group(1) # Bez suffixu _DataListing (např. QueryDetails) match = re.search(r"EDC_(.+)$", stem, re.IGNORECASE) if match: return match.group(1) return stem def map_row(row: dict, source_file: str) -> dict: """Přemapuje jeden CSV řádek na MongoDB dokument.""" doc: dict = {} meta: dict = {} fields: dict = {} # Zjisti všechny klíče pro FieldNValue/FieldNLabel field_keys = set(row.keys()) for col, value in row.items(): value = value.strip() if value else "" # Pevná pole if col in FIXED_FIELDS: path = FIXED_FIELDS[col] if path == "form.folderSeq": try: value = int(value) except (ValueError, TypeError): pass elif path == "form.recordPosition": try: value = int(value) except (ValueError, TypeError): pass elif path in ("lastModified", "importedAt"): parsed = parse_date(value) value = parsed if parsed else value set_nested(doc, path, value) continue # Meta pole if col in META_FIELDS: if value: meta[col] = value continue # FieldNLabel / FieldNValue jsou zpracovány níže if re.match(r"^Field\d+(Value|Label)$", col): continue # Zbývající neznámé pevné sloupce také do meta if not re.match(r"^Field\d+", col): if value: meta[col] = value # Zpracuj páry Field1Value/Field1Label ... Field300Value/Field300Label n = 1 while True: val_key = f"Field{n}Value" lbl_key = f"Field{n}Label" if val_key not in field_keys and lbl_key not in field_keys: break label = (row.get(lbl_key) or "").strip() value = (row.get(val_key) or "").strip() if label and value: # Pokus o převod čísel if label in INT_FIELDS: try: fields[label] = int(value) except ValueError: fields[label] = value else: # Pokus o datum parsed = parse_date(value) fields[label] = parsed if parsed else value n += 1 doc["fields"] = fields doc["sourceFile"] = source_file if meta: doc["_meta"] = meta return doc def ensure_indexes(collection) -> None: collection.create_index([("form.recordId", ASCENDING)], unique=True, sparse=True) collection.create_index([("subject.id", ASCENDING)]) collection.create_index([("site.id", ASCENDING)]) collection.create_index([("study", ASCENDING)]) collection.create_index([("lastModified", ASCENDING)]) def import_file( csv_path: str, collection, snapshot_col=None, snapshot_date: str | None = None, ) -> tuple[int, int, int]: """ Importuje jeden CSV soubor. Vrátí (inserted, updated, errors). snapshot_col: pokud je zadán, pro QueryDetails se zapíše i daily snapshot. """ inserted = updated = errors = 0 source_file = Path(csv_path).name with open(csv_path, encoding="utf-8", newline="") as f: reader = csv.DictReader(f, delimiter=",", quotechar='"') query_mode = is_query_details(reader.fieldnames or []) for line_no, row in enumerate(reader, start=2): try: if query_mode: doc = map_query_row(row, source_file) upsert_key = {"queryId": doc["queryId"]} # Snapshot — upsert na (queryId, snapshotDate) if snapshot_col is not None and snapshot_date: snap_doc = {**doc, "snapshotDate": snapshot_date} snapshot_col.update_one( {"queryId": doc["queryId"], "snapshotDate": snapshot_date}, {"$set": snap_doc}, upsert=True, ) else: doc = map_row(row, source_file) record_id = doc.get("form", {}).get("recordId") upsert_key = {"form.recordId": record_id} if record_id else None if upsert_key: result = collection.update_one( upsert_key, {"$set": doc}, upsert=True, ) if result.upserted_id: inserted += 1 else: updated += 1 else: collection.insert_one(doc) inserted += 1 except PyMongoError as e: errors += 1 log.error("Řádek %d v %s: MongoDB chyba: %s", line_no, csv_path, e) except Exception as e: errors += 1 log.error("Řádek %d v %s: %s", line_no, csv_path, e) return inserted, updated, errors def main() -> None: parser = argparse.ArgumentParser(description="Import EDC CSV reportů do MongoDB") parser.add_argument("files", nargs="+", help="CSV soubory nebo glob vzor") parser.add_argument("--host", default="mongodb://192.168.1.76:27017", help="MongoDB URI") parser.add_argument("--db", default="edc", help="Název databáze") args = parser.parse_args() # Rozbal glob vzory (důležité na Windows kde shell sám neglobuje) paths: list[str] = [] for pattern in args.files: expanded = glob.glob(pattern) paths.extend(expanded if expanded else [pattern]) if not paths: log.error("Žádné soubory nenalezeny.") sys.exit(1) client = MongoClient(args.host, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") except Exception as e: log.error("Nelze se připojit k MongoDB (%s): %s", args.host, e) sys.exit(1) db = client[args.db] total_inserted = total_updated = total_errors = 0 for csv_path in paths: if not os.path.isfile(csv_path): log.warning("Soubor neexistuje, přeskakuji: %s", csv_path) continue # Detekuj typ souboru a vyber kolekci + indexy with open(csv_path, encoding="utf-8", newline="") as f: fieldnames = csv.DictReader(f).fieldnames or [] if is_query_details(fieldnames): col_name = "queries" collection = db[col_name] ensure_query_indexes(collection) snapshot_col = db["queries_snapshots"] ensure_snapshot_indexes(snapshot_col) snapshot_date = extract_snapshot_date(csv_path) log.info("Importuji: %s → %s.%s + queries_snapshots [%s]", csv_path, args.db, col_name, snapshot_date) else: col_name = collection_name_from_filename(csv_path) collection = db[col_name] ensure_indexes(collection) snapshot_col = None snapshot_date = None log.info("Importuji: %s → %s.%s", csv_path, args.db, col_name) inserted, updated, errors = import_file( csv_path, collection, snapshot_col, snapshot_date ) total_inserted += inserted total_updated += updated total_errors += errors log.info(" nové: %d aktualizované: %d chyby: %d", inserted, updated, errors) log.info("=" * 60) log.info("Celkem — nové: %d aktualizované: %d chyby: %d", total_inserted, total_updated, total_errors) client.close() if __name__ == "__main__": main()