""" import_to_mongo.py Verze: 1.3 Datum: 2026-06-15 Import Clario CSV do MongoDB (databáze: Clario). Kolekce: Clario.MayoDiary / Clario.MayoScore / Clario.eCOA_DCRs / Clario.ECG_DCRs Filtr: pouze řádky s Country == "Czech Republic" Klíč: MayoDiary → Subject ID + Form Number MayoScore → Participant ID + Visit eCOA_DCRs → Data Correction ID ECG_DCRs → Data Correction ID Historie: při změně jakéhokoliv datového sloupce (fields + outcome cols) se stará verze uloží do pole history[] spolu s outcome poli Po importu přesune zpracované CSV do downloads/Zpracovano/ Použití: python import_to_mongo.py # importuje všechny CSV z downloads/ python import_to_mongo.py downloads/konkretni.csv # jeden soubor """ import csv import re import shutil import sys from datetime import datetime, timezone from pathlib import Path from pymongo import MongoClient, ASCENDING MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "Clario" DOWNLOADS_DIR = Path(__file__).parent / "downloads" PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano" COUNTRY_FILTER = "Czech Republic" # --------------------------------------------------------------------------- # Konfigurace kolekcí # --------------------------------------------------------------------------- COLLECTION_CONFIG = { "MayoDiary": { "collection": "Clario.MayoDiary", "subject_col": "Subject ID", "key_cols": ("Subject ID", "Form Number"), }, "MayoScore": { "collection": "Clario.MayoScore", "subject_col": "Participant ID", "key_cols": ("Participant ID", "Visit"), "outcome_cols": ( "Site Action", "Last Mayo Score Submission", "Week I-12 Clinical Responder", "Week I-12 Clinical Remission", "Clinical Flare", "Loss of Response", "Partial Mayo Response Post Loss of Response", "Partial Mayo Response for Clinical Non-Responders", ), }, "eCOA DCRs": { "collection": "Clario.eCOA_DCRs", "subject_col": "Subject ID", "key_cols": ("Data Correction ID",), }, "ECG DCRs": { "collection": "Clario.ECG_DCRs", "subject_col": "Subject Number", "key_cols": ("Data Correction ID",), }, } DATE_FORMATS = [ "%d-%b-%Y ", "%d-%b-%Y", "%d-%b-%Y %H:%M:%S", "%d %b %Y %H:%M:%S", "%d %b %Y %H:%M:%S:%f", "%d %b %Y", "%d %B %Y", "%Y%m%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", "%m/%d/%Y %I:%M:%S %p", ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def clean_colname(name: str) -> str: """Odstraní BOM a okolní uvozovky/mezery z názvu sloupce.""" return name.lstrip("").strip().strip('"') def parse_date(value: str) -> str | None: v = value.strip() for fmt in DATE_FORMATS: try: dt = datetime.strptime(v, fmt.strip()) return dt.replace(tzinfo=timezone.utc).isoformat() except ValueError: continue return None def extract_snapshot_date(filename: str) -> str: match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name) return match.group(1) if match else datetime.now().strftime("%Y-%m-%d") def detect_collection_type(filename: str) -> str | None: """Vrátí klíč do COLLECTION_CONFIG nebo None.""" stem = Path(filename).stem for key in COLLECTION_CONFIG: if key in stem: return key return None def data_snapshot(doc: dict, outcome_cols: tuple) -> dict: """Porovnatelný snapshot všech datových polí: fields{} + outcome cols.""" snap = {"fields": doc.get("fields", {})} for col in outcome_cols: snap[col] = doc.get(col) return snap # --------------------------------------------------------------------------- # CSV → dokument # --------------------------------------------------------------------------- def map_row(row: dict, col_type: str) -> dict: cfg = COLLECTION_CONFIG[col_type] doc: dict = {} fields: dict = {} cleaned = {clean_colname(k): v.strip() if v else "" for k, v in row.items()} subject_col = cfg["subject_col"] doc["subject"] = {"id": cleaned.get(subject_col, "")} # ECG DCRs používají "Site ID" místo "Site" site_name = cleaned.get("Site") or cleaned.get("Site ID", "") doc["site"] = {"name": site_name} doc["country"] = cleaned.get("Country", "") doc["study"] = cleaned.get("Protocol", "") key_parts = [cleaned.get(c, "") for c in cfg["key_cols"]] doc["recordKey"] = "_".join(key_parts) outcome_cols = set(cfg.get("outcome_cols", ())) for col in outcome_cols: value = cleaned.get(col, "") if value and value != "-": parsed = parse_date(value) doc[col] = parsed if parsed else value else: doc[col] = None skip_top = {"Protocol", "Country", "Site", subject_col} | outcome_cols for col, value in cleaned.items(): if col in skip_top: continue if not value or value == "-": continue parsed = parse_date(value) fields[col] = parsed if parsed else value doc["fields"] = fields return doc # --------------------------------------------------------------------------- # Import jednoho souboru # --------------------------------------------------------------------------- def import_file(csv_path: str, db) -> dict: filename = Path(csv_path).name col_type = detect_collection_type(filename) if col_type is None: print(f" Preskakuji (neznamy typ): {filename}") return {"skipped": True} cfg = COLLECTION_CONFIG[col_type] col_name = cfg["collection"] outcome_cols = tuple(cfg.get("outcome_cols", ())) snapshot_date = extract_snapshot_date(filename) collection = db[col_name] inserted = changed = unchanged = filtered_out = 0 with open(csv_path, encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f, delimiter=",", quotechar='"') for row in reader: cleaned_row = {clean_colname(k): v for k, v in row.items()} country = cleaned_row.get("Country", "").strip() if COUNTRY_FILTER not in country: filtered_out += 1 continue doc = map_row(row, col_type) record_key = doc.get("recordKey") if not record_key: continue doc["sourceFile"] = filename existing = collection.find_one({"recordKey": record_key}) if existing is None: doc["firstSeen"] = snapshot_date doc["lastSeen"] = snapshot_date doc["history"] = [] collection.insert_one(doc) inserted += 1 elif data_snapshot(existing, outcome_cols) != data_snapshot(doc, outcome_cols): # Uložíme kompletní snapshot starého stavu (fields + outcome cols) old_entry = {"date": existing.get("lastSeen", snapshot_date)} for col in outcome_cols: old_entry[col] = existing.get(col) old_entry["fields"] = existing.get("fields", {}) update_doc = {k: v for k, v in doc.items()} update_doc["lastSeen"] = snapshot_date collection.update_one( {"_id": existing["_id"]}, { "$push": {"history": old_entry}, "$set": update_doc, }, ) changed += 1 else: collection.update_one( {"_id": existing["_id"]}, {"$set": {"lastSeen": snapshot_date, "sourceFile": filename}}, ) unchanged += 1 collection.create_index([("recordKey", ASCENDING)], unique=True) collection.create_index([("subject.id", ASCENDING)]) collection.create_index([("site.name", ASCENDING)]) if col_type == "MayoScore": collection.create_index([("Site Action", ASCENDING)]) if col_type in ("eCOA DCRs", "ECG DCRs"): collection.create_index([("fields.Status", ASCENDING)]) collection.create_index([("fields.Type", ASCENDING)]) stats = { "collection": col_name, "snapshot": snapshot_date, "inserted": inserted, "changed": changed, "unchanged": unchanged, "filtered_out": filtered_out, } print(f" {col_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ") return stats # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): paths: list[Path] = [] if len(sys.argv) > 1: for arg in sys.argv[1:]: p = Path(arg) if p.is_file(): paths.append(p) else: print(f"Soubor nenalezen: {arg}") else: paths = sorted(DOWNLOADS_DIR.glob("*.csv")) if not paths: print("Zadne CSV soubory k importu.") return print(f"Nalezeno {len(paths)} souboru.\n") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[DB_NAME] PROCESSED_DIR.mkdir(exist_ok=True) total = {"inserted": 0, "changed": 0, "unchanged": 0} for csv_path in paths: print(f"Import: {csv_path.name}") stats = import_file(str(csv_path), db) if not stats.get("skipped"): for k in total: total[k] += stats.get(k, 0) dest = PROCESSED_DIR / csv_path.name shutil.move(str(csv_path), str(dest)) print(f" -> presunut do Zpracovano/") client.close() print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same") if __name__ == "__main__": main()