""" Jednorázový backfill historických dat z MySQL do MongoDB. Pro každou snapshotovanou tabulku: - všechny řádky všech import_id → snapshot kolekce - řádky z MAX(import_id) per studie → hlavní kolekce (replace_one upsert) Pro idempotentní tabulky (notifications, destruction): - všechno → hlavní kolekce (replace_one upsert) Notifikace jsou už v Mongo z parse_notifications_to_mongo.py — přeskočí se. """ import os import sys import datetime import mysql.connector from pymongo import ReplaceOne sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from common.mongo_writer import get_db, ensure_indexes, MONGO_DB sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "Patients")) import db_config def conn(): return mysql.connector.connect( host=db_config.DB_HOST, port=db_config.DB_PORT, user=db_config.DB_USER, password=db_config.DB_PASSWORD, database=db_config.DB_NAME, ) def dict_rows(cursor): cols = [d[0] for d in cursor.description] for row in cursor: yield dict(zip(cols, row)) def to_mongo_date(v): if isinstance(v, datetime.datetime): return v if isinstance(v, datetime.date): return datetime.datetime(v.year, v.month, v.day) return v def normalize(doc): return {k: to_mongo_date(v) for k, v in doc.items() if v is not None} # ── 1. iwrs_imports → iwrs_imports ─────────────────────────────────────────── def backfill_imports(): print("[iwrs_imports]") c = conn(); cur = c.cursor() cur.execute("SELECT import_id, study, imported_at, source_file, report_type FROM iwrs_import") db = get_db() ops = [] for r in dict_rows(cur): d = normalize(r) d["_id"] = d["import_id"] ops.append(ReplaceOne({"_id": d["_id"]}, d, upsert=True)) if ops: db.iwrs_imports.bulk_write(ops, ordered=False) print(f" -> {len(ops)} import logu") cur.close(); c.close() # ── 2. subject_summary (UCO + MDD sjednoceno) ──────────────────────────────── UCO_TABLE = "iwrs_uco3001_subject_summary" MDD_TABLE = "iwrs_mdd3003_subject_summary" def backfill_subject_summary(): print("[iwrs_subject_summary]") db = get_db() # zjisti import_id → study mapování c = conn(); cur = c.cursor() cur.execute("SELECT import_id, study, imported_at FROM iwrs_import") import_meta = {r[0]: {"study": r[1], "imported_at": r[2]} for r in cur.fetchall()} cur.close(); c.close() total_snap = 0 total_main = 0 for table, study in [(UCO_TABLE, "77242113UCO3001"), (MDD_TABLE, "42847922MDD3003")]: c = conn(); cur = c.cursor() cur.execute(f"SELECT * FROM {table}") all_rows = list(dict_rows(cur)) cur.close(); c.close() # MAX import_id per studie (pro hlavní kolekci) import_ids = [r["import_id"] for r in all_rows if r.get("import_id") is not None] if not import_ids: continue max_import = max(import_ids) # snapshoty: každý řádek → iwrs_subject_summary_snapshots snap_docs = [] main_ops = [] for r in all_rows: doc = normalize(r) doc.pop("id", None) # MySQL autoincrement nezachováváme doc["study"] = study subject = doc.get("subject") if not subject: continue natural = f"{study}:{subject}" snap = dict(doc) snap["natural_id"] = natural meta = import_meta.get(doc.get("import_id"), {}) snap["imported_at"] = meta.get("imported_at") snap_docs.append(snap) if doc["import_id"] == max_import: main = dict(doc) main["_id"] = natural main["last_import_id"] = max_import main["last_imported_at"] = meta.get("imported_at") main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True)) if snap_docs: db.iwrs_subject_summary_snapshots.insert_many(snap_docs, ordered=False) total_snap += len(snap_docs) if main_ops: db.iwrs_subject_summary.bulk_write(main_ops, ordered=False) total_main += len(main_ops) print(f" {study}: snap={len(snap_docs)} main={len(main_ops)}") print(f" TOTAL snap={total_snap} main={total_main}") # ── 3. visits, shipments, items, inventory (per import_id) ─────────────────── def backfill_per_import(mysql_table, main_coll, snap_coll, id_fn, drop_cols=("id",)): print(f"[{mysql_table} -> {main_coll}/{snap_coll}]") db = get_db() c = conn(); cur = c.cursor() # import_id metadata cur.execute("SELECT import_id, imported_at FROM iwrs_import") import_meta = {r[0]: r[1] for r in cur.fetchall()} # MAX import_id per studie cur.execute(f"SELECT study, MAX(import_id) FROM {mysql_table} GROUP BY study") max_per_study = {r[0]: r[1] for r in cur.fetchall()} cur.execute(f"SELECT * FROM {mysql_table}") all_rows = list(dict_rows(cur)) cur.close(); c.close() snap_docs = [] main_ops = [] seen_main = set() for r in all_rows: doc = normalize(r) for col in drop_cols: doc.pop(col, None) natural = id_fn(doc) if not natural: continue imp_at = import_meta.get(doc.get("import_id")) snap = dict(doc) snap["natural_id"] = natural snap["imported_at"] = imp_at snap_docs.append(snap) study = doc.get("study") if study and doc.get("import_id") == max_per_study.get(study): if natural in seen_main: continue seen_main.add(natural) main = dict(doc) main["_id"] = natural main["last_import_id"] = doc["import_id"] main["last_imported_at"] = imp_at main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True)) if snap_docs: db[snap_coll].insert_many(snap_docs, ordered=False) if main_ops: db[main_coll].bulk_write(main_ops, ordered=False) print(f" snap={len(snap_docs)} main={len(main_ops)}") def visit_id(doc): s, sub = doc.get("study"), doc.get("subject") if not s or not sub: return None key = doc.get("irt_transaction_no") if key is None: sd = doc.get("scheduled_date") key = sd.strftime("%Y%m%d") if sd else "noidx" desc = (doc.get("irt_transaction_description") or "").replace(" ", "_")[:30] return f"{s}:{sub}:{key}:{desc}" def shipment_id_(doc): return doc.get("shipment_id") def shipment_item_id(doc): s, m = doc.get("shipment_id"), doc.get("medication_id") return f"{s}:{m}" if s and m else None def inventory_id(doc): s, m = doc.get("site"), doc.get("medication_id") return f"{s}:{m}" if s and m else None # ── 4. destruction (idempotentní, jen do main) ─────────────────────────────── def backfill_destruction(): print("[iwrs_destruction]") db = get_db() c = conn(); cur = c.cursor() cur.execute("SELECT * FROM iwrs_destruction") rows = list(dict_rows(cur)) cur.close(); c.close() ops = [] seen = set() for r in rows: doc = normalize(r) doc.pop("id", None) basket, med = doc.get("basket_id"), doc.get("medication_id") if not basket or not med: continue nid = f"{basket}:{med}" if nid in seen: continue seen.add(nid) doc["_id"] = nid ops.append(ReplaceOne({"_id": nid}, doc, upsert=True)) if ops: db.iwrs_destruction.bulk_write(ops, ordered=False) print(f" -> {len(ops)} destrukci") # ── main ───────────────────────────────────────────────────────────────────── def main(): print(f"Cilova DB: {MONGO_DB}") ensure_indexes() backfill_imports() backfill_subject_summary() backfill_per_import("iwrs_subject_visits", "iwrs_visits", "iwrs_visits_snapshots", visit_id) backfill_per_import("iwrs_shipments", "iwrs_shipments", "iwrs_shipments_snapshots", shipment_id_) backfill_per_import("iwrs_shipment_items", "iwrs_shipment_items", "iwrs_shipment_items_snapshots", shipment_item_id) backfill_per_import("iwrs_inventory", "iwrs_inventory", "iwrs_inventory_snapshots", inventory_id) backfill_destruction() # finalni statistika db = get_db() print("\nFINALNI STAV V MONGO:") for coll in ["iwrs_imports","iwrs_subject_summary","iwrs_visits","iwrs_notifications", "iwrs_shipments","iwrs_shipment_items","iwrs_inventory","iwrs_destruction", "iwrs_subject_summary_snapshots","iwrs_visits_snapshots", "iwrs_shipments_snapshots","iwrs_shipment_items_snapshots","iwrs_inventory_snapshots"]: n = db[coll].count_documents({}) print(f" {coll:42s} {n}") if __name__ == "__main__": main()