ea9d611719
- Add IWRS/common/mongo_writer.py with shared connection, indexes, upsert+snapshot helpers - Add IWRS/Patients/import_to_mongo.py (subject_summary + visits) - Add IWRS/Patients/import_notifications_to_mongo.py: parse PDF/JSON directly to Mongo (incl. PDF as BinData), replaces 2-step MySQL flow - Add IWRS/Drugs/import_to_mongo.py (shipments, items, inventory, destruction) - Add IWRS/backfill_mysql_to_mongo.py: one-shot history backfill - Switch IWRS/Patients/run_all.py and IWRS/Drugs/run_all.py to Mongo - Rewrite IWRS/Drugs/create_report.py data loaders to read from Mongo - 8 main collections (upsert = latest state) + 5 snapshot collections (append-only with import_id) under studie database; notifications and destruction are immutable and need no snapshots Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
"""
|
|
Sdílený Mongo writer pro IWRS importy.
|
|
|
|
Databáze: studie
|
|
Hlavní kolekce (upsert = aktuální stav):
|
|
iwrs_imports, iwrs_subject_summary, iwrs_visits, iwrs_notifications,
|
|
iwrs_shipments, iwrs_shipment_items, iwrs_inventory, iwrs_destruction
|
|
|
|
Snapshot kolekce (append-only s import_id):
|
|
iwrs_subject_summary_snapshots, iwrs_visits_snapshots,
|
|
iwrs_shipments_snapshots, iwrs_shipment_items_snapshots,
|
|
iwrs_inventory_snapshots
|
|
"""
|
|
|
|
import datetime
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "studie"
|
|
|
|
# ── type converters (sdílené napříč importéry) ──────────────────────────────
|
|
|
|
def _py(val):
|
|
if isinstance(val, np.generic):
|
|
return val.item()
|
|
return val
|
|
|
|
|
|
def to_str(val):
|
|
val = _py(val)
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, float) and (val != val):
|
|
return None
|
|
s = str(val).strip()
|
|
return None if s.lower() in ("nan", "nat", "none", "") else s
|
|
|
|
|
|
def to_int(val):
|
|
val = _py(val)
|
|
try:
|
|
v = float(val)
|
|
return None if (v != v) else int(v)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def to_float(val):
|
|
val = _py(val)
|
|
try:
|
|
v = float(val)
|
|
return None if (v != v) else float(v)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def to_date(val):
|
|
"""Vrací datetime (00:00:00) — Mongo nemá samostatný DATE typ."""
|
|
val = _py(val)
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, float) and (val != val):
|
|
return None
|
|
try:
|
|
if pd.isna(val):
|
|
return None
|
|
except (TypeError, ValueError):
|
|
pass
|
|
if isinstance(val, pd.Timestamp):
|
|
return None if pd.isna(val) else datetime.datetime(val.year, val.month, val.day)
|
|
if isinstance(val, datetime.datetime):
|
|
return datetime.datetime(val.year, val.month, val.day)
|
|
if isinstance(val, datetime.date):
|
|
return datetime.datetime(val.year, val.month, val.day)
|
|
s = str(val).strip()
|
|
if not s or s.lower() in ("nat", "nan", "none", ""):
|
|
return None
|
|
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
|
|
try:
|
|
d = datetime.datetime.strptime(s, fmt)
|
|
return datetime.datetime(d.year, d.month, d.day)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
|
|
# ── connection / indexy ──────────────────────────────────────────────────────
|
|
|
|
_client = None
|
|
|
|
|
|
def get_db():
|
|
global _client
|
|
if _client is None:
|
|
_client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
return _client[MONGO_DB]
|
|
|
|
|
|
def ensure_indexes():
|
|
db = get_db()
|
|
db.iwrs_subject_summary.create_index([("study", ASCENDING), ("subject", ASCENDING)])
|
|
db.iwrs_subject_summary.create_index([("study", ASCENDING), ("site", ASCENDING)])
|
|
db.iwrs_subject_summary.create_index([("irt_subject_status", ASCENDING)])
|
|
|
|
db.iwrs_visits.create_index([("study", ASCENDING), ("subject", ASCENDING)])
|
|
db.iwrs_visits.create_index([("study", ASCENDING), ("scheduled_date", ASCENDING)])
|
|
|
|
db.iwrs_notifications.create_index([("study", ASCENDING), ("subject", ASCENDING)])
|
|
db.iwrs_notifications.create_index([("study", ASCENDING), ("title", ASCENDING)])
|
|
db.iwrs_notifications.create_index([("actual_date", ASCENDING)])
|
|
|
|
db.iwrs_shipments.create_index([("study", ASCENDING)])
|
|
db.iwrs_shipments.create_index([("status", ASCENDING)])
|
|
db.iwrs_shipments.create_index([("shipped_date", ASCENDING)])
|
|
|
|
db.iwrs_shipment_items.create_index([("shipment_id", ASCENDING)])
|
|
db.iwrs_shipment_items.create_index([("medication_id", ASCENDING)])
|
|
db.iwrs_shipment_items.create_index([("packaged_lot_no", ASCENDING)])
|
|
|
|
db.iwrs_inventory.create_index([("study", ASCENDING), ("site", ASCENDING)])
|
|
db.iwrs_inventory.create_index([("medication_id", ASCENDING)])
|
|
db.iwrs_inventory.create_index([("expiration_date", ASCENDING)])
|
|
|
|
db.iwrs_destruction.create_index([("study", ASCENDING), ("basket_id", ASCENDING)])
|
|
db.iwrs_destruction.create_index([("medication_id", ASCENDING)])
|
|
|
|
# snapshot kolekce mají import_id index
|
|
for snap in [
|
|
"iwrs_subject_summary_snapshots",
|
|
"iwrs_visits_snapshots",
|
|
"iwrs_shipments_snapshots",
|
|
"iwrs_shipment_items_snapshots",
|
|
"iwrs_inventory_snapshots",
|
|
]:
|
|
db[snap].create_index([("import_id", ASCENDING)])
|
|
db[snap].create_index([("study", ASCENDING)])
|
|
|
|
|
|
# ── import log ───────────────────────────────────────────────────────────────
|
|
|
|
def log_import(study, source_file, report_type, counts=None):
|
|
"""Zapíše záznam o běhu importu. Vrátí import_id (int sekvenční)."""
|
|
db = get_db()
|
|
# sekvenční import_id (max + 1)
|
|
last = db.iwrs_imports.find_one(sort=[("import_id", -1)])
|
|
next_id = (last["import_id"] + 1) if last else 1
|
|
doc = {
|
|
"import_id": next_id,
|
|
"study": study,
|
|
"imported_at": datetime.datetime.now(),
|
|
"source_file": source_file,
|
|
"report_type": report_type,
|
|
"counts": counts or {},
|
|
}
|
|
db.iwrs_imports.insert_one(doc)
|
|
return next_id
|
|
|
|
|
|
# ── upsert + snapshot ────────────────────────────────────────────────────────
|
|
|
|
def upsert_with_snapshot(collection, snapshot_collection, doc, import_id):
|
|
"""Upsertne dokument do hlavní kolekce + zapíše snapshot.
|
|
|
|
`doc` musí mít `_id`. Hlavní dokument dostane last_import_id/last_imported_at,
|
|
snapshot kopii s import_id a originálním _id přesunutým do natural_id.
|
|
"""
|
|
db = get_db()
|
|
now = datetime.datetime.now()
|
|
main_doc = dict(doc)
|
|
main_doc["last_import_id"] = import_id
|
|
main_doc["last_imported_at"] = now
|
|
db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True)
|
|
|
|
snap = dict(doc)
|
|
snap["natural_id"] = snap.pop("_id")
|
|
snap["import_id"] = import_id
|
|
snap["imported_at"] = now
|
|
db[snapshot_collection].insert_one(snap)
|
|
|
|
|
|
def upsert_only(collection, doc, import_id=None):
|
|
"""Upsert bez snapshotu (pro immutable kolekce: notifications, destruction)."""
|
|
db = get_db()
|
|
main_doc = dict(doc)
|
|
if import_id is not None:
|
|
main_doc["last_import_id"] = import_id
|
|
main_doc["last_imported_at"] = datetime.datetime.now()
|
|
db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True)
|
|
|
|
|
|
def bulk_upsert_with_snapshot(collection, snapshot_collection, docs, import_id):
|
|
"""Hromadný upsert pro výkon."""
|
|
if not docs:
|
|
return 0
|
|
db = get_db()
|
|
now = datetime.datetime.now()
|
|
from pymongo import ReplaceOne
|
|
|
|
main_ops = []
|
|
snap_docs = []
|
|
for doc in docs:
|
|
main = dict(doc)
|
|
main["last_import_id"] = import_id
|
|
main["last_imported_at"] = now
|
|
main_ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True))
|
|
|
|
snap = dict(doc)
|
|
snap["natural_id"] = snap.pop("_id")
|
|
snap["import_id"] = import_id
|
|
snap["imported_at"] = now
|
|
snap_docs.append(snap)
|
|
|
|
db[collection].bulk_write(main_ops, ordered=False)
|
|
db[snapshot_collection].insert_many(snap_docs, ordered=False)
|
|
return len(docs)
|
|
|
|
|
|
def bulk_upsert_only(collection, docs, import_id=None):
|
|
if not docs:
|
|
return 0
|
|
db = get_db()
|
|
now = datetime.datetime.now()
|
|
from pymongo import ReplaceOne
|
|
|
|
ops = []
|
|
for doc in docs:
|
|
main = dict(doc)
|
|
if import_id is not None:
|
|
main["last_import_id"] = import_id
|
|
main["last_imported_at"] = now
|
|
ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True))
|
|
db[collection].bulk_write(ops, ordered=False)
|
|
return len(docs)
|