""" Sdílený Mongo writer pro IWRS importy. Databáze: studie Hlavní kolekce (upsert = aktuální stav): iwrs_imports, iwrs_subject_summary, iwrs_visits, iwrs_notifications, iwrs_shipments, iwrs_shipment_items, iwrs_inventory, iwrs_destruction Snapshot kolekce (append-only s import_id): iwrs_subject_summary_snapshots, iwrs_visits_snapshots, iwrs_shipments_snapshots, iwrs_shipment_items_snapshots, iwrs_inventory_snapshots """ import datetime import numpy as np import pandas as pd from pymongo import MongoClient, ASCENDING MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "studie" # ── type converters (sdílené napříč importéry) ────────────────────────────── def _py(val): if isinstance(val, np.generic): return val.item() return val def to_str(val): val = _py(val) if val is None: return None if isinstance(val, float) and (val != val): return None s = str(val).strip() return None if s.lower() in ("nan", "nat", "none", "") else s def to_int(val): val = _py(val) try: v = float(val) return None if (v != v) else int(v) except (TypeError, ValueError): return None def to_float(val): val = _py(val) try: v = float(val) return None if (v != v) else float(v) except (TypeError, ValueError): return None def to_date(val): """Vrací datetime (00:00:00) — Mongo nemá samostatný DATE typ.""" val = _py(val) if val is None: return None if isinstance(val, float) and (val != val): return None try: if pd.isna(val): return None except (TypeError, ValueError): pass if isinstance(val, pd.Timestamp): return None if pd.isna(val) else datetime.datetime(val.year, val.month, val.day) if isinstance(val, datetime.datetime): return datetime.datetime(val.year, val.month, val.day) if isinstance(val, datetime.date): return datetime.datetime(val.year, val.month, val.day) s = str(val).strip() if not s or s.lower() in ("nat", "nan", "none", ""): return None for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"): try: d = datetime.datetime.strptime(s, fmt) return datetime.datetime(d.year, d.month, d.day) except ValueError: pass return None # ── connection / indexy ────────────────────────────────────────────────────── _client = None def get_db(): global _client if _client is None: _client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) return _client[MONGO_DB] def ensure_indexes(): db = get_db() db.iwrs_subject_summary.create_index([("study", ASCENDING), ("subject", ASCENDING)]) db.iwrs_subject_summary.create_index([("study", ASCENDING), ("site", ASCENDING)]) db.iwrs_subject_summary.create_index([("irt_subject_status", ASCENDING)]) db.iwrs_visits.create_index([("study", ASCENDING), ("subject", ASCENDING)]) db.iwrs_visits.create_index([("study", ASCENDING), ("scheduled_date", ASCENDING)]) db.iwrs_notifications.create_index([("study", ASCENDING), ("subject", ASCENDING)]) db.iwrs_notifications.create_index([("study", ASCENDING), ("title", ASCENDING)]) db.iwrs_notifications.create_index([("actual_date", ASCENDING)]) db.iwrs_shipments.create_index([("study", ASCENDING)]) db.iwrs_shipments.create_index([("status", ASCENDING)]) db.iwrs_shipments.create_index([("shipped_date", ASCENDING)]) db.iwrs_shipment_items.create_index([("shipment_id", ASCENDING)]) db.iwrs_shipment_items.create_index([("medication_id", ASCENDING)]) db.iwrs_shipment_items.create_index([("packaged_lot_no", ASCENDING)]) db.iwrs_inventory.create_index([("study", ASCENDING), ("site", ASCENDING)]) db.iwrs_inventory.create_index([("medication_id", ASCENDING)]) db.iwrs_inventory.create_index([("expiration_date", ASCENDING)]) db.iwrs_destruction.create_index([("study", ASCENDING), ("basket_id", ASCENDING)]) db.iwrs_destruction.create_index([("medication_id", ASCENDING)]) # snapshot kolekce mají import_id index for snap in [ "iwrs_subject_summary_snapshots", "iwrs_visits_snapshots", "iwrs_shipments_snapshots", "iwrs_shipment_items_snapshots", "iwrs_inventory_snapshots", ]: db[snap].create_index([("import_id", ASCENDING)]) db[snap].create_index([("study", ASCENDING)]) # ── import log ─────────────────────────────────────────────────────────────── def log_import(study, source_file, report_type, counts=None): """Zapíše záznam o běhu importu. Vrátí import_id (int sekvenční).""" db = get_db() # sekvenční import_id (max + 1) last = db.iwrs_imports.find_one(sort=[("import_id", -1)]) next_id = (last["import_id"] + 1) if last else 1 doc = { "import_id": next_id, "study": study, "imported_at": datetime.datetime.now(), "source_file": source_file, "report_type": report_type, "counts": counts or {}, } db.iwrs_imports.insert_one(doc) return next_id # ── upsert + snapshot ──────────────────────────────────────────────────────── def upsert_with_snapshot(collection, snapshot_collection, doc, import_id): """Upsertne dokument do hlavní kolekce + zapíše snapshot. `doc` musí mít `_id`. Hlavní dokument dostane last_import_id/last_imported_at, snapshot kopii s import_id a originálním _id přesunutým do natural_id. """ db = get_db() now = datetime.datetime.now() main_doc = dict(doc) main_doc["last_import_id"] = import_id main_doc["last_imported_at"] = now db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True) snap = dict(doc) snap["natural_id"] = snap.pop("_id") snap["import_id"] = import_id snap["imported_at"] = now db[snapshot_collection].insert_one(snap) def upsert_only(collection, doc, import_id=None): """Upsert bez snapshotu (pro immutable kolekce: notifications, destruction).""" db = get_db() main_doc = dict(doc) if import_id is not None: main_doc["last_import_id"] = import_id main_doc["last_imported_at"] = datetime.datetime.now() db[collection].replace_one({"_id": doc["_id"]}, main_doc, upsert=True) def bulk_upsert_with_snapshot(collection, snapshot_collection, docs, import_id): """Hromadný upsert pro výkon.""" if not docs: return 0 db = get_db() now = datetime.datetime.now() from pymongo import ReplaceOne main_ops = [] snap_docs = [] for doc in docs: main = dict(doc) main["last_import_id"] = import_id main["last_imported_at"] = now main_ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True)) snap = dict(doc) snap["natural_id"] = snap.pop("_id") snap["import_id"] = import_id snap["imported_at"] = now snap_docs.append(snap) db[collection].bulk_write(main_ops, ordered=False) db[snapshot_collection].insert_many(snap_docs, ordered=False) return len(docs) def bulk_upsert_only(collection, docs, import_id=None): if not docs: return 0 db = get_db() now = datetime.datetime.now() from pymongo import ReplaceOne ops = [] for doc in docs: main = dict(doc) if import_id is not None: main["last_import_id"] = import_id main["last_imported_at"] = now ops.append(ReplaceOne({"_id": doc["_id"]}, main, upsert=True)) db[collection].bulk_write(ops, ordered=False) return len(docs)