# ============================================================================= # Název: import_to_mongo_v1.5.py # Verze: 1.5 # Datum: 2026-06-16 # Popis: Import CSV reportů do MongoDB (db: covance). # Pipeline 1 — allSamples: kolekce allsamples, klíč Container Barcode No. # Zdroj: Source (study 36940 + 35472) # (study NEtagováno — jeden soubor obsahuje obě studie, # rozlišují se polem fields."Protocol Code") # Pipeline 2 — kits: kolekce kits, klíč Accession # Zdroj: Source (study 36940 + 35472) # v1.5: tagováno pole study z názvu souboru (study-XXXXX), # takže kity lze filtrovat per studie bez regexu na sourceFile. # Pipeline 3 — results: kolekce results, laboratorní výsledky per centrum. # Zdroj: Source, soubory test-results-{SITE}-{typ}.csv # (1. řádek = disclaimer, hlavička je 2. řádek!) # Dva typy (standard / microbiology) v jedné kolekci, # rozlišené polem resultType. record_id: # standard: STD|{Accession}|{Test Group}|{Test}|{occ} # microbiology: MIC|{Accession}|{Test Group}|{Specimen}| # {Test Description}|{Drug Name/Agent}|{occ} # Pipeline 4 — equeries: kolekce equeries, eQuery report (study 36940 + 35472). # Zdroj: Source, soubory ...-equery.csv (FULL). # Klíč eQueryId (stabilní systémové ID, unikátní per řádek); # řádky footeru s parametry filtru (nečíselný eQueryId) se # přeskakují. History sleduje životní cyklus dotazu # (Open -> Response Received -> Closed). # Varianta ...-equery_unresponded_only.csv je jen podmnožina # (Status=Open) téhož reportu + footer => NEIMPORTUJE se, # pouze se přesune do Zpracovano/ (move-only pipeline). # Upsert s historií změn, zpracovaný soubor přesunut do Zpracovano/. # Přepínač --dry-run: nic nezapisuje do DB ani nepřesouvá soubory. # ============================================================================= import csv import re import shutil import sys from datetime import datetime from pathlib import Path from pymongo import MongoClient, ASCENDING MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "covance" SOURCE = Path(__file__).parent / "Source" # --------------------------------------------------------------------------- # Builders record_id + metadata pro jednotlivé pipeline # --------------------------------------------------------------------------- def make_keyed_record(upsert_key: str): """Jednoduchý klíč = hodnota jednoho sloupce (allsamples, kits). Pokud má pipeline file_meta, jeho hodnoty (např. study) se přidají do extra.""" def builder(fields: dict, fmeta: dict | None, occ: dict): key_val = fields.get(upsert_key) if not key_val: return None, {} extra = dict(fmeta) if fmeta else {} return key_val, extra return builder def _norm_subject(raw: str | None) -> str: """'CZ100062001 - null' -> 'CZ100062001'.""" s = (raw or "").strip() return s.split(" - null")[0].strip() def make_results_record(fields: dict, fmeta: dict, occ: dict): rtype = fmeta["resultType"] accession = fields.get("Accession") if not accession: return None, {} if rtype == "standard": parts = (accession, fields.get("Test Group", ""), fields.get("Test", "")) prefix = "STD" else: # microbiology parts = ( accession, fields.get("Test Group", ""), fields.get("Specimen", ""), fields.get("Test Description", ""), fields.get("Drug Name/Agent", ""), ) prefix = "MIC" occ[parts] = occ.get(parts, 0) + 1 record_id = f"{prefix}|" + "|".join(str(p or "") for p in parts) + f"|{occ[parts]}" extra = { "study": fmeta["study"], "site": fmeta["site"], "subject": _norm_subject(fields.get("Subject")), "resultType": rtype, } return record_id, extra def make_equery_record(fields: dict, fmeta: dict | None, occ: dict): """Klíč = eQueryId. Footer s parametry filtru (nečíselný eQueryId) se přeskočí.""" key_val = (fields.get("eQueryId") or "").strip() if not key_val.isdigit(): return None, {} extra = {"study": fmeta["study"]} if fmeta else {} return key_val, extra def results_file_meta(filename: str) -> dict | None: m = re.search(r"study-(\d+)-test-results-(\d+)-(standard|microbiology)", filename, re.IGNORECASE) if not m: return None return {"study": m.group(1), "site": m.group(2), "resultType": m.group(3).lower()} def equery_file_meta(filename: str) -> dict | None: m = re.search(r"study-(\d+)-activity-reports", filename, re.IGNORECASE) return {"study": m.group(1)} if m else {"study": None} def kit_file_meta(filename: str) -> dict | None: m = re.search(r"study-(\d+)-kit-inventory", filename, re.IGNORECASE) return {"study": m.group(1)} if m else {"study": None} PIPELINES = [ { "name": "allsamples", "collection": "allsamples", "pattern": re.compile(r".*-allSamples\.csv$", re.IGNORECASE), "sources": [SOURCE], "header_skip": 0, "make_record": make_keyed_record("Container Barcode No."), "file_meta": None, "indexes": [ [("fields.Sample Status", ASCENDING)], [("fields.Specimen Type", ASCENDING)], ], }, { "name": "kits", "collection": "kits", "pattern": re.compile(r".*-kit-inventory-on-hand-expiration\.csv$", re.IGNORECASE), "sources": [SOURCE], "header_skip": 0, "make_record": make_keyed_record("Accession"), "file_meta": kit_file_meta, "indexes": [ [("study", ASCENDING)], [("fields.Kit Type", ASCENDING)], [("fields.Site", ASCENDING)], [("fields.Expiration Date", ASCENDING)], ], }, { "name": "results", "collection": "results", "pattern": re.compile(r".*test-results-\d+-(standard|microbiology)\.csv$", re.IGNORECASE), "sources": [SOURCE], "header_skip": 1, # 1. řádek je disclaimer, hlavička je 2. řádek "make_record": make_results_record, "file_meta": results_file_meta, "indexes": [ [("subject", ASCENDING)], [("study", ASCENDING)], [("site", ASCENDING)], [("resultType", ASCENDING)], [("fields.Accession", ASCENDING)], [("fields.Test Group", ASCENDING)], ], }, { "name": "equeries", "collection": "equeries", # FULL report; varianta _unresponded_only se sem ZÁMĚRNĚ nechytá (jiný pattern níže) "pattern": re.compile(r".*activity-reports-documents-equery\.csv$", re.IGNORECASE), "sources": [SOURCE], "header_skip": 0, "make_record": make_equery_record, "file_meta": equery_file_meta, "indexes": [ [("study", ASCENDING)], [("fields.Status", ASCENDING)], [("fields.Site", ASCENDING)], [("fields.Subject", ASCENDING)], [("fields.Issue Type", ASCENDING)], ], }, { "name": "equeries_unresponded", "move_only": True, # podmnožina FULL reportu -> jen přesun, neimportuje se "pattern": re.compile(r".*activity-reports-documents-equery_unresponded_only\.csv$", re.IGNORECASE), "sources": [SOURCE], }, ] def extract_snapshot_date(filename: str) -> str: match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name) return match.group(1) if match else datetime.now().strftime("%Y-%m-%d") def clean_value(val: str) -> str | None: val = val.strip() return val if val else None def import_file(csv_path: Path, collection, pipeline: dict, dry_run: bool) -> dict: snapshot_date = extract_snapshot_date(csv_path.name) inserted = changed = unchanged = skipped = 0 fmeta = pipeline["file_meta"](csv_path.name) if pipeline["file_meta"] else None with open(csv_path, newline="", encoding="utf-8-sig") as f: lines = f.readlines() reader = csv.DictReader(lines[pipeline["header_skip"]:]) rows = list(reader) occ: dict = {} # stav pořadí výskytů (per soubor) for row in rows: fields = {k: clean_value(v) for k, v in row.items() if k} record_id, extra = pipeline["make_record"](fields, fmeta, occ) if not record_id: skipped += 1 continue existing = None if dry_run else collection.find_one({"record_id": record_id}) if existing is None and dry_run: inserted += 1 # v dry-run nevíme jistě, počítáme jako kandidáty na insert continue if existing is None: collection.insert_one({ "record_id": record_id, "fields": fields, **extra, "sourceFile": csv_path.name, "firstSeen": snapshot_date, "lastSeen": snapshot_date, "history": [], }) inserted += 1 elif existing["fields"] != fields: collection.update_one( {"_id": existing["_id"]}, { "$push": {"history": {"date": existing["lastSeen"], "fields": existing["fields"]}}, "$set": {"fields": fields, **extra, "sourceFile": csv_path.name, "lastSeen": snapshot_date}, }, ) changed += 1 else: collection.update_one( {"_id": existing["_id"]}, {"$set": {"lastSeen": snapshot_date, "sourceFile": csv_path.name}}, ) unchanged += 1 total_rows = len(rows) db_count = "-" if dry_run else collection.count_documents({}) tag = "[DRY] " if dry_run else "" print(f" {tag}[{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{skipped} bez klice") print(f" Radku v CSV: {total_rows}, dokumentu v DB: {db_count}") if inserted + changed + unchanged + skipped != total_rows: print(f" !!! VAROVANI: soucet ({inserted+changed+unchanged+skipped}) != radku v CSV ({total_rows})") return {"inserted": inserted, "changed": changed, "unchanged": unchanged} def collect_files(pipeline: dict, cli_args: list[str]) -> list[Path]: if cli_args: paths = [] for arg in cli_args: p = Path(arg) if p.is_file() and pipeline["pattern"].match(p.name): paths.append(p) return paths paths = [] for src_dir in pipeline["sources"]: if src_dir.exists(): paths.extend(sorted(p for p in src_dir.glob("*.csv") if pipeline["pattern"].match(p.name))) return paths def move_to_processed(csv_path: Path, dry_run: bool): if dry_run: print(f" [DRY] -> presunul by do Zpracovano/\n") return dest = csv_path.parent / "Zpracovano" / csv_path.name shutil.move(str(csv_path), str(dest)) print(f" -> presunut do Zpracovano/\n") def run_pipeline(pipeline: dict, client, cli_args: list[str], dry_run: bool): paths = collect_files(pipeline, cli_args) if not paths: print(f"[{pipeline['name']}] Zadne soubory k importu.") return print(f"\n=== Pipeline: {pipeline['name']} ({len(paths)} souboru){' [DRY-RUN]' if dry_run else ''} ===") # Move-only pipeline (např. unresponded podmnožina) — jen přesun, žádný import if pipeline.get("move_only"): if not dry_run: for src_dir in pipeline["sources"]: (src_dir / "Zpracovano").mkdir(exist_ok=True) for csv_path in paths: print(f"Move-only: {csv_path.name} [{csv_path.parent.parent.name}]") move_to_processed(csv_path, dry_run) print(f"[{pipeline['name']}] Presunuto {len(paths)} souboru (neimportuje se).") return col = None if not dry_run: col = client[DB_NAME][pipeline["collection"]] col.create_index([("record_id", ASCENDING)], unique=True) for idx in pipeline["indexes"]: col.create_index(idx) for src_dir in pipeline["sources"]: (src_dir / "Zpracovano").mkdir(exist_ok=True) total = {"inserted": 0, "changed": 0, "unchanged": 0} for csv_path in paths: print(f"Import: {csv_path.name} [{csv_path.parent.parent.name}]") stats = import_file(csv_path, col, pipeline, dry_run) for k in total: total[k] += stats[k] move_to_processed(csv_path, dry_run) print(f"[{pipeline['name']}] Celkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same") def main(): args = sys.argv[1:] dry_run = "--dry-run" in args cli_args = [a for a in args if a != "--dry-run"] client = None if not dry_run: client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") for pipeline in PIPELINES: run_pipeline(pipeline, client, cli_args, dry_run) if client: client.close() if __name__ == "__main__": main()