307 lines
14 KiB
Python
307 lines
14 KiB
Python
"""
|
|
import_drugs.py — import Drugs reportů z IWRS/Incoming/ do MongoDB.
|
|
Verze: 1.0 | Datum: 2026-06-10
|
|
|
|
Nahrazuje Drugs/import_to_mongo.py (ten parsoval pevné adresáře xls_*;
|
|
nyní se parsují datumované soubory z IWRS/Incoming/).
|
|
|
|
Per studie a běh: jeden import_id. Soubory se zpracují nejstarší napřed,
|
|
při více souborech stejného záznamu vyhrává poslední (poslední stav).
|
|
Po úspěšném zápisu do Monga se zparsované soubory přesunou do
|
|
IWRS/Incoming/Processed/; soubor s chybou parsování zůstává v Incoming/.
|
|
|
|
Cílové kolekce (db `studie`):
|
|
iwrs_shipments / iwrs_shipment_items / iwrs_inventory (upsert + snapshot)
|
|
iwrs_destruction (upsert only, immutable)
|
|
|
|
Volá se z IWRS/run_all_v1.0.py (ensure_indexes volá orchestrátor);
|
|
lze spustit i samostatně: python import_drugs.py
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import glob
|
|
|
|
import pandas as pd
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
IWRS_DIR = os.path.dirname(BASE_DIR)
|
|
for _p in (IWRS_DIR, BASE_DIR):
|
|
if _p not in sys.path:
|
|
sys.path.insert(0, _p)
|
|
|
|
from common.paths import INCOMING_DIR, STUDIES, move_done, sorted_by_mtime
|
|
from common.mongo_writer import (
|
|
to_str, to_int, to_date,
|
|
ensure_indexes, log_import,
|
|
bulk_upsert_with_snapshot, bulk_upsert_only,
|
|
)
|
|
|
|
|
|
def _pending(pattern):
|
|
return sorted_by_mtime(glob.glob(os.path.join(INCOMING_DIR, pattern)))
|
|
|
|
|
|
def _find_header_row(raw, marker):
|
|
for i, row in raw.iterrows():
|
|
if marker in [str(v).strip() for v in row]:
|
|
return i
|
|
return None
|
|
|
|
|
|
# ── XLSX parsery (per soubor) ────────────────────────────────────────────────
|
|
|
|
def parse_shipments_file(path, study):
|
|
raw = pd.read_excel(path, header=None)
|
|
header_row = _find_header_row(raw, "Shipment ID")
|
|
if header_row is None:
|
|
raise ValueError("hlavičkový řádek 'Shipment ID' nenalezen")
|
|
df = pd.read_excel(path, header=header_row).dropna(how="all")
|
|
df = df[df["Location"].astype(str).str.contains("Czech", na=False, case=False)]
|
|
col = df.columns.tolist()
|
|
rows = []
|
|
for _, r in df.iterrows():
|
|
sid = to_str(r["Shipment ID"])
|
|
if not sid:
|
|
continue
|
|
rows.append({
|
|
"_id": sid,
|
|
"shipment_id": sid,
|
|
"study": study,
|
|
"status": to_str(r["IRT Shipment Status"]),
|
|
"type": to_str(r["Type"]),
|
|
"ship_from": to_str(r["Shipment From"]),
|
|
"ship_to_site": to_str(r["Ship To:"]),
|
|
"location": to_str(r["Location"]),
|
|
"request_date": to_date(r["Request Date"]),
|
|
"shipped_date": to_date(r["Shipped Date"]),
|
|
"received_date": to_date(r["Received Date"]) if "Received Date" in col else None,
|
|
"received_by": to_str(r["Received by"]) if "Received by" in col else None,
|
|
"delivered_date_utc": to_date(r["Delivered Date [UTC]"]) if "Delivered Date [UTC]" in col else None,
|
|
"delivery_recipient": to_str(r["Delivery Recipient"]) if "Delivery Recipient" in col else None,
|
|
"delivery_details": to_str(r["Delivery Details"]) if "Delivery Details" in col else None,
|
|
"cancelled_date": to_date(r["Cancelled Date"]) if "Cancelled Date" in col else None,
|
|
"total_medication_ids": to_int(r["Total Medication IDs"]) if "Total Medication IDs" in col else None,
|
|
"tracking_no": to_str(r["Tracking #"]) if "Tracking #" in col else None,
|
|
"shipping_category": to_str(r["Shipping Category"]) if "Shipping Category" in col else None,
|
|
"expected_arrival": to_date(r["Expected Arrival"]) if "Expected Arrival" in col else None,
|
|
})
|
|
return rows
|
|
|
|
|
|
def parse_shipment_details_file(path, study):
|
|
# shipment_id z názvu: "... Shipment Details {id}[ HHMM].xlsx"
|
|
m = re.search(r"Shipment Details (\S+?)(?: \d{4})?\.xlsx$", os.path.basename(path))
|
|
shipment_id = m.group(1) if m else "UNKNOWN"
|
|
raw = pd.read_excel(path, header=None)
|
|
header_row = _find_header_row(raw, "Medication ID")
|
|
if header_row is None:
|
|
raise ValueError("hlavičkový řádek 'Medication ID' nenalezen")
|
|
df = pd.read_excel(path, header=header_row).dropna(how="all")
|
|
rows = []
|
|
for _, r in df.iterrows():
|
|
med_desc = (to_str(r.get("Medication Description"))
|
|
or to_str(r.get("Medication ID Description")))
|
|
med_type = (to_str(r.get("Medication type"))
|
|
or to_str(r.get("Medication ID type")))
|
|
med_id = to_str(r.get("Medication ID"))
|
|
if not med_id:
|
|
continue
|
|
rows.append({
|
|
"_id": f"{shipment_id}:{med_id}",
|
|
"study": study,
|
|
"shipment_id": shipment_id,
|
|
"destination_location": to_str(r.get("Destination Location")),
|
|
"shipment_status": to_str(r.get("IRT Shipment Status")),
|
|
"shipment_type": to_str(r.get("Type")),
|
|
"destination_site": to_str(r.get("Destination Site")),
|
|
"investigator": to_str(r.get("Investigator")),
|
|
"medication_description": med_desc,
|
|
"medication_type": med_type,
|
|
"medication_id": med_id,
|
|
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
|
|
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
|
|
"container_id": to_str(r.get("Container ID")),
|
|
"quantity": to_int(r.get("Quantity of Medication IDs")),
|
|
"expiration_date": to_date(r.get("Expiration Date")),
|
|
"item_status": to_str(r.get("Status")),
|
|
})
|
|
return rows
|
|
|
|
|
|
def parse_inventory_file(path, study):
|
|
raw = pd.read_excel(path, header=None)
|
|
site = investigator = location = None
|
|
header_row = None
|
|
for i, row in raw.iterrows():
|
|
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
|
|
if first.startswith("Site:"):
|
|
site = first.replace("Site:", "").strip()
|
|
elif first.startswith("Investigator:"):
|
|
investigator = first.replace("Investigator:", "").strip()
|
|
elif first.startswith("Location:"):
|
|
location = first.replace("Location:", "").strip()
|
|
if first in ("Medication", "Medication ID") and header_row is None:
|
|
header_row = i
|
|
if header_row is None:
|
|
raise ValueError("hlavičkový řádek 'Medication' nenalezen")
|
|
df = pd.read_excel(path, header=header_row).dropna(how="all")
|
|
df = df.rename(columns={df.columns[0]: "medication_id"})
|
|
rows = []
|
|
for _, r in df.iterrows():
|
|
med_id = to_str(r["medication_id"])
|
|
if not med_id or not site:
|
|
continue
|
|
rows.append({
|
|
"_id": f"{site}:{med_id}",
|
|
"study": study,
|
|
"site": site,
|
|
"investigator": investigator,
|
|
"location": location,
|
|
"medication_id": med_id,
|
|
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
|
|
"original_expiration_date": to_date(r.get("Original Expiration Date when Packaged Lot was Added")),
|
|
"expiration_date": to_date(r.get("Expiration date")),
|
|
"received_date": to_date(r.get("Received Date")),
|
|
"receipt_user": to_str(r.get("Shipment Receipt User")),
|
|
"subject_identifier": to_str(r.get("Subject Identifier")),
|
|
"quantity_assigned": to_int(r.get("Quantity Assigned")),
|
|
"irt_transaction": to_str(r.get("IRT Transaction")),
|
|
"date_assigned": to_date(r.get("Date Assigned")),
|
|
"assignment_user": to_str(r.get("Assignment User")),
|
|
"dispensation_status": to_str(r.get("Dispensation Status")),
|
|
"dispensing_date": to_date(r.get("Dispensing date") or r.get("Dispensing Date")),
|
|
"quantity_dispensed": to_int(r.get("Quantity Dispensed")),
|
|
"dispensing_user": to_str(r.get("Dispensing User")),
|
|
"quantity_returned": to_int(r.get("Quantity Returned")),
|
|
"date_returned": to_date(r.get("Date Returned")),
|
|
"return_user": to_str(r.get("Return User")),
|
|
})
|
|
return rows
|
|
|
|
|
|
def parse_destruction_file(path, study):
|
|
raw = pd.read_excel(path, header=None)
|
|
meta = {}
|
|
header_row = None
|
|
for i, row in raw.iterrows():
|
|
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
|
|
for key, attr in [
|
|
("Investigator Name:", "investigator"),
|
|
("Site ID:", "site_id"),
|
|
("Location:", "location"),
|
|
("Basket ID:", "basket_id"),
|
|
("Drug Destruction Created Date:", "destruction_date"),
|
|
]:
|
|
if first.startswith(key):
|
|
meta[attr] = first.replace(key, "").strip()
|
|
if first == "Medication ID Description" and header_row is None:
|
|
header_row = i
|
|
if header_row is None:
|
|
raise ValueError("hlavičkový řádek 'Medication ID Description' nenalezen")
|
|
df = pd.read_excel(path, header=header_row).dropna(how="all")
|
|
basket_id = meta.get("basket_id")
|
|
rows = []
|
|
for _, r in df.iterrows():
|
|
med_id = to_str(r.get("Medication ID"))
|
|
if not med_id or not basket_id:
|
|
continue
|
|
rows.append({
|
|
"_id": f"{basket_id}:{med_id}",
|
|
"study": study,
|
|
"site_id": meta.get("site_id"),
|
|
"investigator": meta.get("investigator"),
|
|
"location": meta.get("location"),
|
|
"basket_id": basket_id,
|
|
"destruction_date": to_date(meta.get("destruction_date")),
|
|
"medication_description": to_str(r.get("Medication ID Description")),
|
|
"medication_id": med_id,
|
|
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
|
|
"comments": to_str(r.get("Comments")),
|
|
})
|
|
return rows
|
|
|
|
|
|
# ── zpracování souborů ───────────────────────────────────────────────────────
|
|
|
|
def _parse_files(files, parser, study, label):
|
|
"""Zparsuje soubory (nejstarší napřed, poslední vyhrává per _id).
|
|
|
|
Vrací (docs, ok_paths, failed_paths).
|
|
"""
|
|
docs, ok, failed = {}, [], []
|
|
for path in files:
|
|
try:
|
|
for d in parser(path, study):
|
|
docs[d["_id"]] = d
|
|
ok.append(path)
|
|
except Exception as e:
|
|
failed.append(path)
|
|
print(f" [{study}] CHYBA parsování {label} {os.path.basename(path)}: {e}")
|
|
return list(docs.values()), ok, failed
|
|
|
|
|
|
def import_study(study):
|
|
ship_files = _pending(f"* {study} Shipments Report*.xlsx")
|
|
item_files = _pending(f"* {study} Shipment Details *.xlsx")
|
|
inv_files = _pending(f"* {study} Onsite Inventory *.xlsx")
|
|
dest_files = _pending(f"* {study} IP Destruction *.xlsx")
|
|
|
|
if not (ship_files or item_files or inv_files or dest_files):
|
|
print(f" [{study}] drugs: nic ke zpracování")
|
|
return
|
|
|
|
shipments, ok_ship, _ = _parse_files(ship_files, parse_shipments_file, study, "shipments")
|
|
items, ok_item, _ = _parse_files(item_files, parse_shipment_details_file, study, "details")
|
|
inventory, ok_inv, _ = _parse_files(inv_files, parse_inventory_file, study, "inventory")
|
|
destruct, ok_dest, _ = _parse_files(dest_files, parse_destruction_file, study, "destruction")
|
|
|
|
ok_files = ok_ship + ok_item + ok_inv + ok_dest
|
|
if not ok_files:
|
|
print(f" [{study}] drugs: žádný soubor se nepodařilo zparsovat")
|
|
return
|
|
|
|
print(f" [{study}] Zásilky: {len(shipments)} | Položky: {len(items)} | "
|
|
f"Sklad: {len(inventory)} | Destrukce: {len(destruct)}")
|
|
|
|
import_id = log_import(study, f"drugs_{study}", "drugs", {
|
|
"shipments": len(shipments),
|
|
"shipment_items": len(items),
|
|
"inventory": len(inventory),
|
|
"destruction": len(destruct),
|
|
})
|
|
print(f" [{study}] import_id = {import_id}")
|
|
|
|
bulk_upsert_with_snapshot("iwrs_shipments", "iwrs_shipments_snapshots", shipments, import_id)
|
|
bulk_upsert_with_snapshot("iwrs_shipment_items", "iwrs_shipment_items_snapshots", items, import_id)
|
|
bulk_upsert_with_snapshot("iwrs_inventory", "iwrs_inventory_snapshots", inventory, import_id)
|
|
bulk_upsert_only("iwrs_destruction", destruct, import_id)
|
|
|
|
# zápis do Monga prošel → archivovat zdrojové soubory
|
|
for path in ok_files:
|
|
move_done(path)
|
|
print(f" [{study}] drugs: {len(ok_files)} soubor(ů) přesunuto do Processed")
|
|
|
|
|
|
def run(studies=None):
|
|
studies = studies or STUDIES
|
|
if not os.path.isdir(INCOMING_DIR):
|
|
print(f"Adresář neexistuje: {INCOMING_DIR}")
|
|
return
|
|
print("=" * 60)
|
|
print("Import Drugs (shipments / items / inventory / destruction)")
|
|
print("=" * 60)
|
|
for study in studies:
|
|
try:
|
|
import_study(study)
|
|
except Exception as e:
|
|
import traceback
|
|
print(f" [{study}] CHYBA importu drugs: {e}")
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ensure_indexes()
|
|
run(sys.argv[1:] or None)
|