Files
administrator ea9d611719 Migrate IWRS from MySQL to MongoDB
- Add IWRS/common/mongo_writer.py with shared connection, indexes,
  upsert+snapshot helpers
- Add IWRS/Patients/import_to_mongo.py (subject_summary + visits)
- Add IWRS/Patients/import_notifications_to_mongo.py: parse PDF/JSON
  directly to Mongo (incl. PDF as BinData), replaces 2-step MySQL flow
- Add IWRS/Drugs/import_to_mongo.py (shipments, items, inventory,
  destruction)
- Add IWRS/backfill_mysql_to_mongo.py: one-shot history backfill
- Switch IWRS/Patients/run_all.py and IWRS/Drugs/run_all.py to Mongo
- Rewrite IWRS/Drugs/create_report.py data loaders to read from Mongo
- 8 main collections (upsert = latest state) + 5 snapshot collections
  (append-only with import_id) under studie database; notifications and
  destruction are immutable and need no snapshots

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 07:24:36 +02:00

236 lines
7.9 KiB
Python

"""
Sdílený Mongo writer pro IWRS importy.
Databáze: studie
Hlavní kolekce (upsert = aktuální stav):
iwrs_imports, iwrs_subject_summary, iwrs_visits, iwrs_notifications,
iwrs_shipments, iwrs_shipment_items, iwrs_inventory, iwrs_destruction
Snapshot kolekce (append-only s import_id):
iwrs_subject_summary_snapshots, iwrs_visits_snapshots,
iwrs_shipments_snapshots, iwrs_shipment_items_snapshots,
iwrs_inventory_snapshots
"""
import datetime
import numpy as np
import pandas as pd
from pymongo import MongoClient, ASCENDING
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "studie"
# ── type converters (sdílené napříč importéry) ──────────────────────────────
def _py(val):
if isinstance(val, np.generic):
return val.item()
return val
def to_str(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def to_int(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else int(v)
except (TypeError, ValueError):
return None
def to_float(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else float(v)
except (TypeError, ValueError):
return None
def to_date(val):
"""Vrací datetime (00:00:00) — Mongo nemá samostatný DATE typ."""
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
try:
if pd.isna(val):
return None
except (TypeError, ValueError):
pass
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else datetime.datetime(val.year, val.month, val.day)
if isinstance(val, datetime.datetime):
return datetime.datetime(val.year, val.month, val.day)
if isinstance(val, datetime.date):
return datetime.datetime(val.year, val.month, val.day)
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
d = datetime.datetime.strptime(s, fmt)
return datetime.datetime(d.year, d.month, d.day)
except ValueError:
pass
return None
# ── connection / indexy ──────────────────────────────────────────────────────
_client = None
def get_db():
global _client
if _client is None:
_client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
return _client[MONGO_DB]
def ensure_indexes():
db = get_db()
db.iwrs_subject_summary.create_index([("study", ASCENDING), ("subject", ASCENDING)])
db.iwrs_subject_summary.create_index([("study", ASCENDING), ("site", ASCENDING)])
db.iwrs_subject_summary.create_index([("irt_subject_status", ASCENDING)])
db.iwrs_visits.create_index([("study", ASCENDING), ("subject", ASCENDING)])
db.iwrs_visits.create_index([("study", ASCENDING), ("scheduled_date", ASCENDING)])
db.iwrs_notifications.create_index([("study", ASCENDING), ("subject", ASCENDING)])
db.iwrs_notifications.create_index([("study", ASCENDING), ("title", ASCENDING)])
db.iwrs_notifications.create_index([("actual_date", ASCENDING)])
db.iwrs_shipments.create_index([("study", ASCENDING)])
db.iwrs_shipments.create_index([("status", ASCENDING)])
db.iwrs_shipments.create_index([("shipped_date", ASCENDING)])
db.iwrs_shipment_items.create_index([("shipment_id", ASCENDING)])
db.iwrs_shipment_items.create_index([("medication_id", ASCENDING)])
db.iwrs_shipment_items.create_index([("packaged_lot_no", ASCENDING)])
db.iwrs_inventory.create_index([("study", ASCENDING), ("site", ASCENDING)])
db.iwrs_inventory.create_index([("medication_id", ASCENDING)])
db.iwrs_inventory.create_index([("expiration_date", ASCENDING)])
db.iwrs_destruction.create_index([("study", ASCENDING), ("basket_id", ASCENDING)])
db.iwrs_destruction.create_index([("medication_id", ASCENDING)])
# snapshot kolekce mají import_id index
for snap in [
"iwrs_subject_summary_snapshots",
"iwrs_visits_snapshots",
"iwrs_shipments_snapshots",
"iwrs_shipment_items_snapshots",
"iwrs_inventory_snapshots",
]:
db[snap].create_index([("import_id", ASCENDING)])
db[snap].create_index([("study", ASCENDING)])
# ── import log ───────────────────────────────────────────────────────────────
def log_import(study, source_file, report_type, counts=None):
"""Zapíše záznam o běhu importu. Vrátí import_id (int sekvenční)."""
db = get_db()
# sekvenční import_id (max + 1)
last = db.iwrs_imports.find_one(sort=[("import_id", -1)])
next_id = (last["import_id"] + 1) if last else 1
doc = {
"import_id": next_id,
"study": study,
"imported_at": datetime.datetime.now(),
"source_file": source_file,
"report_type": report_type,
"counts": counts or {},
}
db.iwrs_imports.insert_one(doc)
return next_id
# ── upsert + snapshot ────────────────────────────────────────────────────────
def upsert_with_snapshot(collection, snapshot_collection, doc, import_id):
"""Upsertne dokument do hlavní kolekce + zapíše snapshot.
`doc` musí mít `_id`. Hlavní dokument dostane last_import_id/last_imported_at,
snapshot kopii s import_id a originálním _id přesunutým do natural_id.
"""
db = get_db()
now = datetime.datetime.now()
main_doc = dict(doc)
main_doc["last_import_id"] = import_id
main_doc["last_imported_at"] = now
db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True)
snap = dict(doc)
snap["natural_id"] = snap.pop("_id")
snap["import_id"] = import_id
snap["imported_at"] = now
db[snapshot_collection].insert_one(snap)
def upsert_only(collection, doc, import_id=None):
"""Upsert bez snapshotu (pro immutable kolekce: notifications, destruction)."""
db = get_db()
main_doc = dict(doc)
if import_id is not None:
main_doc["last_import_id"] = import_id
main_doc["last_imported_at"] = datetime.datetime.now()
db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True)
def bulk_upsert_with_snapshot(collection, snapshot_collection, docs, import_id):
"""Hromadný upsert pro výkon."""
if not docs:
return 0
db = get_db()
now = datetime.datetime.now()
from pymongo import ReplaceOne
main_ops = []
snap_docs = []
for doc in docs:
main = dict(doc)
main["last_import_id"] = import_id
main["last_imported_at"] = now
main_ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True))
snap = dict(doc)
snap["natural_id"] = snap.pop("_id")
snap["import_id"] = import_id
snap["imported_at"] = now
snap_docs.append(snap)
db[collection].bulk_write(main_ops, ordered=False)
db[snapshot_collection].insert_many(snap_docs, ordered=False)
return len(docs)
def bulk_upsert_only(collection, docs, import_id=None):
if not docs:
return 0
db = get_db()
now = datetime.datetime.now()
from pymongo import ReplaceOne
ops = []
for doc in docs:
main = dict(doc)
if import_id is not None:
main["last_import_id"] = import_id
main["last_imported_at"] = now
ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True))
db[collection].bulk_write(ops, ordered=False)
return len(docs)