Migrate IWRS from MySQL to MongoDB
- Add IWRS/common/mongo_writer.py with shared connection, indexes, upsert+snapshot helpers - Add IWRS/Patients/import_to_mongo.py (subject_summary + visits) - Add IWRS/Patients/import_notifications_to_mongo.py: parse PDF/JSON directly to Mongo (incl. PDF as BinData), replaces 2-step MySQL flow - Add IWRS/Drugs/import_to_mongo.py (shipments, items, inventory, destruction) - Add IWRS/backfill_mysql_to_mongo.py: one-shot history backfill - Switch IWRS/Patients/run_all.py and IWRS/Drugs/run_all.py to Mongo - Rewrite IWRS/Drugs/create_report.py data loaders to read from Mongo - 8 main collections (upsert = latest state) + 5 snapshot collections (append-only with import_id) under studie database; notifications and destruction are immutable and need no snapshots Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,272 @@
|
||||
"""
|
||||
Jednorázový backfill historických dat z MySQL do MongoDB.
|
||||
|
||||
Pro každou snapshotovanou tabulku:
|
||||
- všechny řádky všech import_id → snapshot kolekce
|
||||
- řádky z MAX(import_id) per studie → hlavní kolekce (replace_one upsert)
|
||||
|
||||
Pro idempotentní tabulky (notifications, destruction):
|
||||
- všechno → hlavní kolekce (replace_one upsert)
|
||||
|
||||
Notifikace jsou už v Mongo z parse_notifications_to_mongo.py — přeskočí se.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
import mysql.connector
|
||||
from pymongo import ReplaceOne
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from common.mongo_writer import get_db, ensure_indexes, MONGO_DB
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "Patients"))
|
||||
import db_config
|
||||
|
||||
|
||||
def conn():
|
||||
return mysql.connector.connect(
|
||||
host=db_config.DB_HOST, port=db_config.DB_PORT,
|
||||
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
|
||||
database=db_config.DB_NAME,
|
||||
)
|
||||
|
||||
|
||||
def dict_rows(cursor):
|
||||
cols = [d[0] for d in cursor.description]
|
||||
for row in cursor:
|
||||
yield dict(zip(cols, row))
|
||||
|
||||
|
||||
def to_mongo_date(v):
|
||||
if isinstance(v, datetime.datetime):
|
||||
return v
|
||||
if isinstance(v, datetime.date):
|
||||
return datetime.datetime(v.year, v.month, v.day)
|
||||
return v
|
||||
|
||||
|
||||
def normalize(doc):
|
||||
return {k: to_mongo_date(v) for k, v in doc.items() if v is not None}
|
||||
|
||||
|
||||
# ── 1. iwrs_imports → iwrs_imports ───────────────────────────────────────────
|
||||
|
||||
def backfill_imports():
|
||||
print("[iwrs_imports]")
|
||||
c = conn(); cur = c.cursor()
|
||||
cur.execute("SELECT import_id, study, imported_at, source_file, report_type FROM iwrs_import")
|
||||
db = get_db()
|
||||
ops = []
|
||||
for r in dict_rows(cur):
|
||||
d = normalize(r)
|
||||
d["_id"] = d["import_id"]
|
||||
ops.append(ReplaceOne({"_id": d["_id"]}, d, upsert=True))
|
||||
if ops:
|
||||
db.iwrs_imports.bulk_write(ops, ordered=False)
|
||||
print(f" -> {len(ops)} import logu")
|
||||
cur.close(); c.close()
|
||||
|
||||
|
||||
# ── 2. subject_summary (UCO + MDD sjednoceno) ────────────────────────────────
|
||||
|
||||
UCO_TABLE = "iwrs_uco3001_subject_summary"
|
||||
MDD_TABLE = "iwrs_mdd3003_subject_summary"
|
||||
|
||||
|
||||
def backfill_subject_summary():
|
||||
print("[iwrs_subject_summary]")
|
||||
db = get_db()
|
||||
# zjisti import_id → study mapování
|
||||
c = conn(); cur = c.cursor()
|
||||
cur.execute("SELECT import_id, study, imported_at FROM iwrs_import")
|
||||
import_meta = {r[0]: {"study": r[1], "imported_at": r[2]} for r in cur.fetchall()}
|
||||
cur.close(); c.close()
|
||||
|
||||
total_snap = 0
|
||||
total_main = 0
|
||||
|
||||
for table, study in [(UCO_TABLE, "77242113UCO3001"), (MDD_TABLE, "42847922MDD3003")]:
|
||||
c = conn(); cur = c.cursor()
|
||||
cur.execute(f"SELECT * FROM {table}")
|
||||
all_rows = list(dict_rows(cur))
|
||||
cur.close(); c.close()
|
||||
|
||||
# MAX import_id per studie (pro hlavní kolekci)
|
||||
import_ids = [r["import_id"] for r in all_rows if r.get("import_id") is not None]
|
||||
if not import_ids:
|
||||
continue
|
||||
max_import = max(import_ids)
|
||||
|
||||
# snapshoty: každý řádek → iwrs_subject_summary_snapshots
|
||||
snap_docs = []
|
||||
main_ops = []
|
||||
for r in all_rows:
|
||||
doc = normalize(r)
|
||||
doc.pop("id", None) # MySQL autoincrement nezachováváme
|
||||
doc["study"] = study
|
||||
subject = doc.get("subject")
|
||||
if not subject:
|
||||
continue
|
||||
natural = f"{study}:{subject}"
|
||||
|
||||
snap = dict(doc)
|
||||
snap["natural_id"] = natural
|
||||
meta = import_meta.get(doc.get("import_id"), {})
|
||||
snap["imported_at"] = meta.get("imported_at")
|
||||
snap_docs.append(snap)
|
||||
|
||||
if doc["import_id"] == max_import:
|
||||
main = dict(doc)
|
||||
main["_id"] = natural
|
||||
main["last_import_id"] = max_import
|
||||
main["last_imported_at"] = meta.get("imported_at")
|
||||
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
|
||||
|
||||
if snap_docs:
|
||||
db.iwrs_subject_summary_snapshots.insert_many(snap_docs, ordered=False)
|
||||
total_snap += len(snap_docs)
|
||||
if main_ops:
|
||||
db.iwrs_subject_summary.bulk_write(main_ops, ordered=False)
|
||||
total_main += len(main_ops)
|
||||
print(f" {study}: snap={len(snap_docs)} main={len(main_ops)}")
|
||||
|
||||
print(f" TOTAL snap={total_snap} main={total_main}")
|
||||
|
||||
|
||||
# ── 3. visits, shipments, items, inventory (per import_id) ───────────────────
|
||||
|
||||
def backfill_per_import(mysql_table, main_coll, snap_coll, id_fn,
|
||||
drop_cols=("id",)):
|
||||
print(f"[{mysql_table} -> {main_coll}/{snap_coll}]")
|
||||
db = get_db()
|
||||
c = conn(); cur = c.cursor()
|
||||
|
||||
# import_id metadata
|
||||
cur.execute("SELECT import_id, imported_at FROM iwrs_import")
|
||||
import_meta = {r[0]: r[1] for r in cur.fetchall()}
|
||||
|
||||
# MAX import_id per studie
|
||||
cur.execute(f"SELECT study, MAX(import_id) FROM {mysql_table} GROUP BY study")
|
||||
max_per_study = {r[0]: r[1] for r in cur.fetchall()}
|
||||
|
||||
cur.execute(f"SELECT * FROM {mysql_table}")
|
||||
all_rows = list(dict_rows(cur))
|
||||
cur.close(); c.close()
|
||||
|
||||
snap_docs = []
|
||||
main_ops = []
|
||||
seen_main = set()
|
||||
for r in all_rows:
|
||||
doc = normalize(r)
|
||||
for col in drop_cols:
|
||||
doc.pop(col, None)
|
||||
natural = id_fn(doc)
|
||||
if not natural:
|
||||
continue
|
||||
imp_at = import_meta.get(doc.get("import_id"))
|
||||
|
||||
snap = dict(doc)
|
||||
snap["natural_id"] = natural
|
||||
snap["imported_at"] = imp_at
|
||||
snap_docs.append(snap)
|
||||
|
||||
study = doc.get("study")
|
||||
if study and doc.get("import_id") == max_per_study.get(study):
|
||||
if natural in seen_main:
|
||||
continue
|
||||
seen_main.add(natural)
|
||||
main = dict(doc)
|
||||
main["_id"] = natural
|
||||
main["last_import_id"] = doc["import_id"]
|
||||
main["last_imported_at"] = imp_at
|
||||
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
|
||||
|
||||
if snap_docs:
|
||||
db[snap_coll].insert_many(snap_docs, ordered=False)
|
||||
if main_ops:
|
||||
db[main_coll].bulk_write(main_ops, ordered=False)
|
||||
print(f" snap={len(snap_docs)} main={len(main_ops)}")
|
||||
|
||||
|
||||
def visit_id(doc):
|
||||
s, sub = doc.get("study"), doc.get("subject")
|
||||
if not s or not sub:
|
||||
return None
|
||||
key = doc.get("irt_transaction_no")
|
||||
if key is None:
|
||||
sd = doc.get("scheduled_date")
|
||||
key = sd.strftime("%Y%m%d") if sd else "noidx"
|
||||
desc = (doc.get("irt_transaction_description") or "").replace(" ", "_")[:30]
|
||||
return f"{s}:{sub}:{key}:{desc}"
|
||||
|
||||
|
||||
def shipment_id_(doc):
|
||||
return doc.get("shipment_id")
|
||||
|
||||
|
||||
def shipment_item_id(doc):
|
||||
s, m = doc.get("shipment_id"), doc.get("medication_id")
|
||||
return f"{s}:{m}" if s and m else None
|
||||
|
||||
|
||||
def inventory_id(doc):
|
||||
s, m = doc.get("site"), doc.get("medication_id")
|
||||
return f"{s}:{m}" if s and m else None
|
||||
|
||||
|
||||
# ── 4. destruction (idempotentní, jen do main) ───────────────────────────────
|
||||
|
||||
def backfill_destruction():
|
||||
print("[iwrs_destruction]")
|
||||
db = get_db()
|
||||
c = conn(); cur = c.cursor()
|
||||
cur.execute("SELECT * FROM iwrs_destruction")
|
||||
rows = list(dict_rows(cur))
|
||||
cur.close(); c.close()
|
||||
ops = []
|
||||
seen = set()
|
||||
for r in rows:
|
||||
doc = normalize(r)
|
||||
doc.pop("id", None)
|
||||
basket, med = doc.get("basket_id"), doc.get("medication_id")
|
||||
if not basket or not med:
|
||||
continue
|
||||
nid = f"{basket}:{med}"
|
||||
if nid in seen:
|
||||
continue
|
||||
seen.add(nid)
|
||||
doc["_id"] = nid
|
||||
ops.append(ReplaceOne({"_id": nid}, doc, upsert=True))
|
||||
if ops:
|
||||
db.iwrs_destruction.bulk_write(ops, ordered=False)
|
||||
print(f" -> {len(ops)} destrukci")
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
print(f"Cilova DB: {MONGO_DB}")
|
||||
ensure_indexes()
|
||||
backfill_imports()
|
||||
backfill_subject_summary()
|
||||
backfill_per_import("iwrs_subject_visits", "iwrs_visits", "iwrs_visits_snapshots", visit_id)
|
||||
backfill_per_import("iwrs_shipments", "iwrs_shipments", "iwrs_shipments_snapshots", shipment_id_)
|
||||
backfill_per_import("iwrs_shipment_items", "iwrs_shipment_items", "iwrs_shipment_items_snapshots", shipment_item_id)
|
||||
backfill_per_import("iwrs_inventory", "iwrs_inventory", "iwrs_inventory_snapshots", inventory_id)
|
||||
backfill_destruction()
|
||||
|
||||
# finalni statistika
|
||||
db = get_db()
|
||||
print("\nFINALNI STAV V MONGO:")
|
||||
for coll in ["iwrs_imports","iwrs_subject_summary","iwrs_visits","iwrs_notifications",
|
||||
"iwrs_shipments","iwrs_shipment_items","iwrs_inventory","iwrs_destruction",
|
||||
"iwrs_subject_summary_snapshots","iwrs_visits_snapshots",
|
||||
"iwrs_shipments_snapshots","iwrs_shipment_items_snapshots","iwrs_inventory_snapshots"]:
|
||||
n = db[coll].count_documents({})
|
||||
print(f" {coll:42s} {n}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user