273 lines
9.1 KiB
Python
273 lines
9.1 KiB
Python
"""
|
|
Jednorázový backfill historických dat z MySQL do MongoDB.
|
|
|
|
Pro každou snapshotovanou tabulku:
|
|
- všechny řádky všech import_id → snapshot kolekce
|
|
- řádky z MAX(import_id) per studie → hlavní kolekce (replace_one upsert)
|
|
|
|
Pro idempotentní tabulky (notifications, destruction):
|
|
- všechno → hlavní kolekce (replace_one upsert)
|
|
|
|
Notifikace jsou už v Mongo z parse_notifications_to_mongo.py — přeskočí se.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import datetime
|
|
|
|
import mysql.connector
|
|
from pymongo import ReplaceOne
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from common.mongo_writer import get_db, ensure_indexes, MONGO_DB
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "Patients"))
|
|
import db_config
|
|
|
|
|
|
def conn():
|
|
return mysql.connector.connect(
|
|
host=db_config.DB_HOST, port=db_config.DB_PORT,
|
|
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
|
|
database=db_config.DB_NAME,
|
|
)
|
|
|
|
|
|
def dict_rows(cursor):
|
|
cols = [d[0] for d in cursor.description]
|
|
for row in cursor:
|
|
yield dict(zip(cols, row))
|
|
|
|
|
|
def to_mongo_date(v):
|
|
if isinstance(v, datetime.datetime):
|
|
return v
|
|
if isinstance(v, datetime.date):
|
|
return datetime.datetime(v.year, v.month, v.day)
|
|
return v
|
|
|
|
|
|
def normalize(doc):
|
|
return {k: to_mongo_date(v) for k, v in doc.items() if v is not None}
|
|
|
|
|
|
# ── 1. iwrs_imports → iwrs_imports ───────────────────────────────────────────
|
|
|
|
def backfill_imports():
|
|
print("[iwrs_imports]")
|
|
c = conn(); cur = c.cursor()
|
|
cur.execute("SELECT import_id, study, imported_at, source_file, report_type FROM iwrs_import")
|
|
db = get_db()
|
|
ops = []
|
|
for r in dict_rows(cur):
|
|
d = normalize(r)
|
|
d["_id"] = d["import_id"]
|
|
ops.append(ReplaceOne({"_id": d["_id"]}, d, upsert=True))
|
|
if ops:
|
|
db.iwrs_imports.bulk_write(ops, ordered=False)
|
|
print(f" -> {len(ops)} import logu")
|
|
cur.close(); c.close()
|
|
|
|
|
|
# ── 2. subject_summary (UCO + MDD sjednoceno) ────────────────────────────────
|
|
|
|
UCO_TABLE = "iwrs_uco3001_subject_summary"
|
|
MDD_TABLE = "iwrs_mdd3003_subject_summary"
|
|
|
|
|
|
def backfill_subject_summary():
|
|
print("[iwrs_subject_summary]")
|
|
db = get_db()
|
|
# zjisti import_id → study mapování
|
|
c = conn(); cur = c.cursor()
|
|
cur.execute("SELECT import_id, study, imported_at FROM iwrs_import")
|
|
import_meta = {r[0]: {"study": r[1], "imported_at": r[2]} for r in cur.fetchall()}
|
|
cur.close(); c.close()
|
|
|
|
total_snap = 0
|
|
total_main = 0
|
|
|
|
for table, study in [(UCO_TABLE, "77242113UCO3001"), (MDD_TABLE, "42847922MDD3003")]:
|
|
c = conn(); cur = c.cursor()
|
|
cur.execute(f"SELECT * FROM {table}")
|
|
all_rows = list(dict_rows(cur))
|
|
cur.close(); c.close()
|
|
|
|
# MAX import_id per studie (pro hlavní kolekci)
|
|
import_ids = [r["import_id"] for r in all_rows if r.get("import_id") is not None]
|
|
if not import_ids:
|
|
continue
|
|
max_import = max(import_ids)
|
|
|
|
# snapshoty: každý řádek → iwrs_subject_summary_snapshots
|
|
snap_docs = []
|
|
main_ops = []
|
|
for r in all_rows:
|
|
doc = normalize(r)
|
|
doc.pop("id", None) # MySQL autoincrement nezachováváme
|
|
doc["study"] = study
|
|
subject = doc.get("subject")
|
|
if not subject:
|
|
continue
|
|
natural = f"{study}:{subject}"
|
|
|
|
snap = dict(doc)
|
|
snap["natural_id"] = natural
|
|
meta = import_meta.get(doc.get("import_id"), {})
|
|
snap["imported_at"] = meta.get("imported_at")
|
|
snap_docs.append(snap)
|
|
|
|
if doc["import_id"] == max_import:
|
|
main = dict(doc)
|
|
main["_id"] = natural
|
|
main["last_import_id"] = max_import
|
|
main["last_imported_at"] = meta.get("imported_at")
|
|
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
|
|
|
|
if snap_docs:
|
|
db.iwrs_subject_summary_snapshots.insert_many(snap_docs, ordered=False)
|
|
total_snap += len(snap_docs)
|
|
if main_ops:
|
|
db.iwrs_subject_summary.bulk_write(main_ops, ordered=False)
|
|
total_main += len(main_ops)
|
|
print(f" {study}: snap={len(snap_docs)} main={len(main_ops)}")
|
|
|
|
print(f" TOTAL snap={total_snap} main={total_main}")
|
|
|
|
|
|
# ── 3. visits, shipments, items, inventory (per import_id) ───────────────────
|
|
|
|
def backfill_per_import(mysql_table, main_coll, snap_coll, id_fn,
|
|
drop_cols=("id",)):
|
|
print(f"[{mysql_table} -> {main_coll}/{snap_coll}]")
|
|
db = get_db()
|
|
c = conn(); cur = c.cursor()
|
|
|
|
# import_id metadata
|
|
cur.execute("SELECT import_id, imported_at FROM iwrs_import")
|
|
import_meta = {r[0]: r[1] for r in cur.fetchall()}
|
|
|
|
# MAX import_id per studie
|
|
cur.execute(f"SELECT study, MAX(import_id) FROM {mysql_table} GROUP BY study")
|
|
max_per_study = {r[0]: r[1] for r in cur.fetchall()}
|
|
|
|
cur.execute(f"SELECT * FROM {mysql_table}")
|
|
all_rows = list(dict_rows(cur))
|
|
cur.close(); c.close()
|
|
|
|
snap_docs = []
|
|
main_ops = []
|
|
seen_main = set()
|
|
for r in all_rows:
|
|
doc = normalize(r)
|
|
for col in drop_cols:
|
|
doc.pop(col, None)
|
|
natural = id_fn(doc)
|
|
if not natural:
|
|
continue
|
|
imp_at = import_meta.get(doc.get("import_id"))
|
|
|
|
snap = dict(doc)
|
|
snap["natural_id"] = natural
|
|
snap["imported_at"] = imp_at
|
|
snap_docs.append(snap)
|
|
|
|
study = doc.get("study")
|
|
if study and doc.get("import_id") == max_per_study.get(study):
|
|
if natural in seen_main:
|
|
continue
|
|
seen_main.add(natural)
|
|
main = dict(doc)
|
|
main["_id"] = natural
|
|
main["last_import_id"] = doc["import_id"]
|
|
main["last_imported_at"] = imp_at
|
|
main_ops.append(ReplaceOne({"_id": natural}, main, upsert=True))
|
|
|
|
if snap_docs:
|
|
db[snap_coll].insert_many(snap_docs, ordered=False)
|
|
if main_ops:
|
|
db[main_coll].bulk_write(main_ops, ordered=False)
|
|
print(f" snap={len(snap_docs)} main={len(main_ops)}")
|
|
|
|
|
|
def visit_id(doc):
|
|
s, sub = doc.get("study"), doc.get("subject")
|
|
if not s or not sub:
|
|
return None
|
|
key = doc.get("irt_transaction_no")
|
|
if key is None:
|
|
sd = doc.get("scheduled_date")
|
|
key = sd.strftime("%Y%m%d") if sd else "noidx"
|
|
desc = (doc.get("irt_transaction_description") or "").replace(" ", "_")[:30]
|
|
return f"{s}:{sub}:{key}:{desc}"
|
|
|
|
|
|
def shipment_id_(doc):
|
|
return doc.get("shipment_id")
|
|
|
|
|
|
def shipment_item_id(doc):
|
|
s, m = doc.get("shipment_id"), doc.get("medication_id")
|
|
return f"{s}:{m}" if s and m else None
|
|
|
|
|
|
def inventory_id(doc):
|
|
s, m = doc.get("site"), doc.get("medication_id")
|
|
return f"{s}:{m}" if s and m else None
|
|
|
|
|
|
# ── 4. destruction (idempotentní, jen do main) ───────────────────────────────
|
|
|
|
def backfill_destruction():
|
|
print("[iwrs_destruction]")
|
|
db = get_db()
|
|
c = conn(); cur = c.cursor()
|
|
cur.execute("SELECT * FROM iwrs_destruction")
|
|
rows = list(dict_rows(cur))
|
|
cur.close(); c.close()
|
|
ops = []
|
|
seen = set()
|
|
for r in rows:
|
|
doc = normalize(r)
|
|
doc.pop("id", None)
|
|
basket, med = doc.get("basket_id"), doc.get("medication_id")
|
|
if not basket or not med:
|
|
continue
|
|
nid = f"{basket}:{med}"
|
|
if nid in seen:
|
|
continue
|
|
seen.add(nid)
|
|
doc["_id"] = nid
|
|
ops.append(ReplaceOne({"_id": nid}, doc, upsert=True))
|
|
if ops:
|
|
db.iwrs_destruction.bulk_write(ops, ordered=False)
|
|
print(f" -> {len(ops)} destrukci")
|
|
|
|
|
|
# ── main ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print(f"Cilova DB: {MONGO_DB}")
|
|
ensure_indexes()
|
|
backfill_imports()
|
|
backfill_subject_summary()
|
|
backfill_per_import("iwrs_subject_visits", "iwrs_visits", "iwrs_visits_snapshots", visit_id)
|
|
backfill_per_import("iwrs_shipments", "iwrs_shipments", "iwrs_shipments_snapshots", shipment_id_)
|
|
backfill_per_import("iwrs_shipment_items", "iwrs_shipment_items", "iwrs_shipment_items_snapshots", shipment_item_id)
|
|
backfill_per_import("iwrs_inventory", "iwrs_inventory", "iwrs_inventory_snapshots", inventory_id)
|
|
backfill_destruction()
|
|
|
|
# finalni statistika
|
|
db = get_db()
|
|
print("\nFINALNI STAV V MONGO:")
|
|
for coll in ["iwrs_imports","iwrs_subject_summary","iwrs_visits","iwrs_notifications",
|
|
"iwrs_shipments","iwrs_shipment_items","iwrs_inventory","iwrs_destruction",
|
|
"iwrs_subject_summary_snapshots","iwrs_visits_snapshots",
|
|
"iwrs_shipments_snapshots","iwrs_shipment_items_snapshots","iwrs_inventory_snapshots"]:
|
|
n = db[coll].count_documents({})
|
|
print(f" {coll:42s} {n}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|