Files
janssen/IWRS/Trash/backfill_mysql_to_mongo.py
T
2026-06-10 11:59:19 +02:00

273 lines
9.1 KiB
Python

"""
Jednorázový backfill historických dat z MySQL do MongoDB.
Pro každou snapshotovanou tabulku:
- všechny řádky všech import_id → snapshot kolekce
- řádky z MAX(import_id) per studie → hlavní kolekce (replace_one upsert)
Pro idempotentní tabulky (notifications, destruction):
- všechno → hlavní kolekce (replace_one upsert)
Notifikace jsou už v Mongo z parse_notifications_to_mongo.py — přeskočí se.
"""
import os
import sys
import datetime
import mysql.connector
from pymongo import ReplaceOne
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common.mongo_writer import get_db, ensure_indexes, MONGO_DB
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "Patients"))
import db_config
def conn():
return mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def dict_rows(cursor):
cols = [d[0] for d in cursor.description]
for row in cursor:
yield dict(zip(cols, row))
def to_mongo_date(v):
if isinstance(v, datetime.datetime):
return v
if isinstance(v, datetime.date):
return datetime.datetime(v.year, v.month, v.day)
return v
def normalize(doc):
return {k: to_mongo_date(v) for k, v in doc.items() if v is not None}
# ── 1. iwrs_imports → iwrs_imports ───────────────────────────────────────────
def backfill_imports():
print("[iwrs_imports]")
c = conn(); cur = c.cursor()
cur.execute("SELECT import_id, study, imported_at, source_file, report_type FROM iwrs_import")
db = get_db()
ops = []
for r in dict_rows(cur):
d = normalize(r)
d["_id"] = d["import_id"]
ops.append(ReplaceOne({"_id": d["_id"]}, d, upsert=True))
if ops:
db.iwrs_imports.bulk_write(ops, ordered=False)
print(f" -> {len(ops)} import logu")
cur.close(); c.close()
# ── 2. subject_summary (UCO + MDD sjednoceno) ────────────────────────────────
UCO_TABLE = "iwrs_uco3001_subject_summary"
MDD_TABLE = "iwrs_mdd3003_subject_summary"
def backfill_subject_summary():
print("[iwrs_subject_summary]")
db = get_db()
# zjisti import_id → study mapování
c = conn(); cur = c.cursor()
cur.execute("SELECT import_id, study, imported_at FROM iwrs_import")
import_meta = {r[0]: {"study": r[1], "imported_at": r[2]} for r in cur.fetchall()}
cur.close(); c.close()
total_snap = 0
total_main = 0
for table, study in [(UCO_TABLE, "77242113UCO3001"), (MDD_TABLE, "42847922MDD3003")]:
c = conn(); cur = c.cursor()
cur.execute(f"SELECT * FROM {table}")
all_rows = list(dict_rows(cur))
cur.close(); c.close()
# MAX import_id per studie (pro hlavní kolekci)
import_ids = [r["import_id"] for r in all_rows if r.get("import_id") is not None]
if not import_ids:
continue
max_import = max(import_ids)
# snapshoty: každý řádek → iwrs_subject_summary_snapshots
snap_docs = []
main_ops = []
for r in all_rows:
doc = normalize(r)
doc.pop("id", None) # MySQL autoincrement nezachováváme
doc["study"] = study
subject = doc.get("subject")
if not subject:
continue
natural = f"{study}:{subject}"
snap = dict(doc)
snap["natural_id"] = natural
meta = import_meta.get(doc.get("import_id"), {})
snap["imported_at"] = meta.get("imported_at")
snap_docs.append(snap)
if doc["import_id"] == max_import:
main = dict(doc)
main["_id"] = natural
main["last_import_id"] = max_import
main["last_imported_at"] = meta.get("imported_at")
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
if snap_docs:
db.iwrs_subject_summary_snapshots.insert_many(snap_docs, ordered=False)
total_snap += len(snap_docs)
if main_ops:
db.iwrs_subject_summary.bulk_write(main_ops, ordered=False)
total_main += len(main_ops)
print(f" {study}: snap={len(snap_docs)} main={len(main_ops)}")
print(f" TOTAL snap={total_snap} main={total_main}")
# ── 3. visits, shipments, items, inventory (per import_id) ───────────────────
def backfill_per_import(mysql_table, main_coll, snap_coll, id_fn,
drop_cols=("id",)):
print(f"[{mysql_table} -> {main_coll}/{snap_coll}]")
db = get_db()
c = conn(); cur = c.cursor()
# import_id metadata
cur.execute("SELECT import_id, imported_at FROM iwrs_import")
import_meta = {r[0]: r[1] for r in cur.fetchall()}
# MAX import_id per studie
cur.execute(f"SELECT study, MAX(import_id) FROM {mysql_table} GROUP BY study")
max_per_study = {r[0]: r[1] for r in cur.fetchall()}
cur.execute(f"SELECT * FROM {mysql_table}")
all_rows = list(dict_rows(cur))
cur.close(); c.close()
snap_docs = []
main_ops = []
seen_main = set()
for r in all_rows:
doc = normalize(r)
for col in drop_cols:
doc.pop(col, None)
natural = id_fn(doc)
if not natural:
continue
imp_at = import_meta.get(doc.get("import_id"))
snap = dict(doc)
snap["natural_id"] = natural
snap["imported_at"] = imp_at
snap_docs.append(snap)
study = doc.get("study")
if study and doc.get("import_id") == max_per_study.get(study):
if natural in seen_main:
continue
seen_main.add(natural)
main = dict(doc)
main["_id"] = natural
main["last_import_id"] = doc["import_id"]
main["last_imported_at"] = imp_at
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
if snap_docs:
db[snap_coll].insert_many(snap_docs, ordered=False)
if main_ops:
db[main_coll].bulk_write(main_ops, ordered=False)
print(f" snap={len(snap_docs)} main={len(main_ops)}")
def visit_id(doc):
s, sub = doc.get("study"), doc.get("subject")
if not s or not sub:
return None
key = doc.get("irt_transaction_no")
if key is None:
sd = doc.get("scheduled_date")
key = sd.strftime("%Y%m%d") if sd else "noidx"
desc = (doc.get("irt_transaction_description") or "").replace(" ", "_")[:30]
return f"{s}:{sub}:{key}:{desc}"
def shipment_id_(doc):
return doc.get("shipment_id")
def shipment_item_id(doc):
s, m = doc.get("shipment_id"), doc.get("medication_id")
return f"{s}:{m}" if s and m else None
def inventory_id(doc):
s, m = doc.get("site"), doc.get("medication_id")
return f"{s}:{m}" if s and m else None
# ── 4. destruction (idempotentní, jen do main) ───────────────────────────────
def backfill_destruction():
print("[iwrs_destruction]")
db = get_db()
c = conn(); cur = c.cursor()
cur.execute("SELECT * FROM iwrs_destruction")
rows = list(dict_rows(cur))
cur.close(); c.close()
ops = []
seen = set()
for r in rows:
doc = normalize(r)
doc.pop("id", None)
basket, med = doc.get("basket_id"), doc.get("medication_id")
if not basket or not med:
continue
nid = f"{basket}:{med}"
if nid in seen:
continue
seen.add(nid)
doc["_id"] = nid
ops.append(ReplaceOne({"_id": nid}, doc, upsert=True))
if ops:
db.iwrs_destruction.bulk_write(ops, ordered=False)
print(f" -> {len(ops)} destrukci")
# ── main ─────────────────────────────────────────────────────────────────────
def main():
print(f"Cilova DB: {MONGO_DB}")
ensure_indexes()
backfill_imports()
backfill_subject_summary()
backfill_per_import("iwrs_subject_visits", "iwrs_visits", "iwrs_visits_snapshots", visit_id)
backfill_per_import("iwrs_shipments", "iwrs_shipments", "iwrs_shipments_snapshots", shipment_id_)
backfill_per_import("iwrs_shipment_items", "iwrs_shipment_items", "iwrs_shipment_items_snapshots", shipment_item_id)
backfill_per_import("iwrs_inventory", "iwrs_inventory", "iwrs_inventory_snapshots", inventory_id)
backfill_destruction()
# finalni statistika
db = get_db()
print("\nFINALNI STAV V MONGO:")
for coll in ["iwrs_imports","iwrs_subject_summary","iwrs_visits","iwrs_notifications",
"iwrs_shipments","iwrs_shipment_items","iwrs_inventory","iwrs_destruction",
"iwrs_subject_summary_snapshots","iwrs_visits_snapshots",
"iwrs_shipments_snapshots","iwrs_shipment_items_snapshots","iwrs_inventory_snapshots"]:
n = db[coll].count_documents({})
print(f" {coll:42s} {n}")
if __name__ == "__main__":
main()