This commit is contained in:
2026-06-10 09:25:49 +02:00
parent bc9b874f3b
commit 30f045e350
26 changed files with 1 additions and 1 deletions
+253
View File
@@ -0,0 +1,253 @@
"""
Import Drugs dat (shipments, shipment_items, inventory, destruction) z XLSX do MongoDB.
Volá se z IWRS/Drugs/run_all.py po stažení reportů.
"""
import os
import sys
import re
import glob
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from common.mongo_writer import (
to_str, to_int, to_date,
ensure_indexes, log_import,
bulk_upsert_with_snapshot, bulk_upsert_only,
)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ── XLSX parsery (převzaté z run_all.py + úprava na Mongo dokumenty) ─────────
def parse_shipments_report(study):
path = os.path.join(BASE_DIR, f"xls_shipments_{study}", f"shipments_report_{study}.xlsx")
if not os.path.exists(path):
print(f" CHYBI: {path}")
return []
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Shipment ID" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
df = pd.read_excel(path, header=header_row).dropna(how="all")
df = df[df["Location"].astype(str).str.contains("Czech", na=False, case=False)]
col = df.columns.tolist()
rows = []
for _, r in df.iterrows():
sid = to_str(r["Shipment ID"])
if not sid:
continue
rows.append({
"_id": sid,
"shipment_id": sid,
"study": study,
"status": to_str(r["IRT Shipment Status"]),
"type": to_str(r["Type"]),
"ship_from": to_str(r["Shipment From"]),
"ship_to_site": to_str(r["Ship To:"]),
"location": to_str(r["Location"]),
"request_date": to_date(r["Request Date"]),
"shipped_date": to_date(r["Shipped Date"]),
"received_date": to_date(r["Received Date"]) if "Received Date" in col else None,
"received_by": to_str(r["Received by"]) if "Received by" in col else None,
"delivered_date_utc": to_date(r["Delivered Date [UTC]"]) if "Delivered Date [UTC]" in col else None,
"delivery_recipient": to_str(r["Delivery Recipient"]) if "Delivery Recipient" in col else None,
"delivery_details": to_str(r["Delivery Details"]) if "Delivery Details" in col else None,
"cancelled_date": to_date(r["Cancelled Date"]) if "Cancelled Date" in col else None,
"total_medication_ids": to_int(r["Total Medication IDs"]) if "Total Medication IDs" in col else None,
"tracking_no": to_str(r["Tracking #"]) if "Tracking #" in col else None,
"shipping_category": to_str(r["Shipping Category"]) if "Shipping Category" in col else None,
"expected_arrival": to_date(r["Expected Arrival"]) if "Expected Arrival" in col else None,
})
return rows
def parse_shipment_details(study):
detail_dir = os.path.join(BASE_DIR, f"xls_shipment_details_{study}")
files = sorted(glob.glob(os.path.join(detail_dir, "shipment_details_*.xlsx")))
rows = []
for path in files:
m = re.search(r"shipment_details_(.+)\.xlsx", os.path.basename(path))
shipment_id = m.group(1) if m else "UNKNOWN"
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Medication ID" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
continue
df = pd.read_excel(path, header=header_row).dropna(how="all")
for _, r in df.iterrows():
med_desc = (to_str(r.get("Medication Description"))
or to_str(r.get("Medication ID Description")))
med_type = (to_str(r.get("Medication type"))
or to_str(r.get("Medication ID type")))
med_id = to_str(r.get("Medication ID"))
if not med_id:
continue
rows.append({
"_id": f"{shipment_id}:{med_id}",
"study": study,
"shipment_id": shipment_id,
"destination_location": to_str(r.get("Destination Location")),
"shipment_status": to_str(r.get("IRT Shipment Status")),
"shipment_type": to_str(r.get("Type")),
"destination_site": to_str(r.get("Destination Site")),
"investigator": to_str(r.get("Investigator")),
"medication_description": med_desc,
"medication_type": med_type,
"medication_id": med_id,
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
"container_id": to_str(r.get("Container ID")),
"quantity": to_int(r.get("Quantity of Medication IDs")),
"expiration_date": to_date(r.get("Expiration Date")),
"item_status": to_str(r.get("Status")),
})
# dedupe (poslední vyhrává)
by_id = {r["_id"]: r for r in rows}
return list(by_id.values())
def parse_inventory(study):
inv_dir = os.path.join(BASE_DIR, f"xls_reports_{study}")
files = sorted(glob.glob(os.path.join(inv_dir, "onsite_inventory_detail_*.xlsx")))
rows = []
for path in files:
raw = pd.read_excel(path, header=None)
site = investigator = location = None
header_row = None
for i, row in raw.iterrows():
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
if first.startswith("Site:"):
site = first.replace("Site:", "").strip()
elif first.startswith("Investigator:"):
investigator = first.replace("Investigator:", "").strip()
elif first.startswith("Location:"):
location = first.replace("Location:", "").strip()
if first in ("Medication", "Medication ID") and header_row is None:
header_row = i
if header_row is None:
continue
df = pd.read_excel(path, header=header_row).dropna(how="all")
df = df.rename(columns={df.columns[0]: "medication_id"})
for _, r in df.iterrows():
med_id = to_str(r["medication_id"])
if not med_id or not site:
continue
rows.append({
"_id": f"{site}:{med_id}",
"study": study,
"site": site,
"investigator": investigator,
"location": location,
"medication_id": med_id,
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
"original_expiration_date": to_date(r.get("Original Expiration Date when Packaged Lot was Added")),
"expiration_date": to_date(r.get("Expiration date")),
"received_date": to_date(r.get("Received Date")),
"receipt_user": to_str(r.get("Shipment Receipt User")),
"subject_identifier": to_str(r.get("Subject Identifier")),
"quantity_assigned": to_int(r.get("Quantity Assigned")),
"irt_transaction": to_str(r.get("IRT Transaction")),
"date_assigned": to_date(r.get("Date Assigned")),
"assignment_user": to_str(r.get("Assignment User")),
"dispensation_status": to_str(r.get("Dispensation Status")),
"dispensing_date": to_date(r.get("Dispensing date") or r.get("Dispensing Date")),
"quantity_dispensed": to_int(r.get("Quantity Dispensed")),
"dispensing_user": to_str(r.get("Dispensing User")),
"quantity_returned": to_int(r.get("Quantity Returned")),
"date_returned": to_date(r.get("Date Returned")),
"return_user": to_str(r.get("Return User")),
})
by_id = {r["_id"]: r for r in rows}
return list(by_id.values())
def parse_destruction_files(study):
dest_dir = os.path.join(BASE_DIR, f"xls_ip_destruction_{study}")
files = sorted(glob.glob(os.path.join(dest_dir, "ip_destruction_basket_*.xlsx")))
rows = []
for path in files:
raw = pd.read_excel(path, header=None)
meta = {}
header_row = None
for i, row in raw.iterrows():
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
for key, attr in [
("Investigator Name:", "investigator"),
("Site ID:", "site_id"),
("Location:", "location"),
("Basket ID:", "basket_id"),
("Drug Destruction Created Date:", "destruction_date"),
]:
if first.startswith(key):
meta[attr] = first.replace(key, "").strip()
if first == "Medication ID Description" and header_row is None:
header_row = i
if header_row is None:
continue
df = pd.read_excel(path, header=header_row).dropna(how="all")
basket_id = meta.get("basket_id")
for _, r in df.iterrows():
med_id = to_str(r.get("Medication ID"))
if not med_id or not basket_id:
continue
rows.append({
"_id": f"{basket_id}:{med_id}",
"study": study,
"site_id": meta.get("site_id"),
"investigator": meta.get("investigator"),
"location": meta.get("location"),
"basket_id": basket_id,
"destruction_date": to_date(meta.get("destruction_date")),
"medication_description": to_str(r.get("Medication ID Description")),
"medication_id": med_id,
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
"comments": to_str(r.get("Comments")),
})
by_id = {r["_id"]: r for r in rows}
return list(by_id.values())
# ── hlavní import ────────────────────────────────────────────────────────────
def import_study(study):
print(f"\n [{study}] parsovani XLSX...")
shipments = parse_shipments_report(study)
items = parse_shipment_details(study)
inventory = parse_inventory(study)
destruct = parse_destruction_files(study)
print(f" Zasilky: {len(shipments)} | Polozky: {len(items)} | Sklad: {len(inventory)} | Destrukce: {len(destruct)}")
import_id = log_import(study, f"drugs_{study}", "drugs", {
"shipments": len(shipments),
"shipment_items": len(items),
"inventory": len(inventory),
"destruction": len(destruct),
})
print(f" import_id = {import_id}")
bulk_upsert_with_snapshot("iwrs_shipments", "iwrs_shipments_snapshots", shipments, import_id)
bulk_upsert_with_snapshot("iwrs_shipment_items", "iwrs_shipment_items_snapshots", items, import_id)
bulk_upsert_with_snapshot("iwrs_inventory", "iwrs_inventory_snapshots", inventory, import_id)
bulk_upsert_only("iwrs_destruction", destruct, import_id)
def run(studies):
ensure_indexes()
for s in studies:
import_study(s)
if __name__ == "__main__":
studies = sys.argv[1:] if len(sys.argv) > 1 else ["77242113UCO3001", "42847922MDD3003"]
run(studies)
+245
View File
@@ -0,0 +1,245 @@
"""
Kompletní pipeline pro Drugs:
1. Onsite inventory detail (per site, vždy přepisuje)
2. IP destruction (per košík, přeskočí již existující soubory)
3. Shipments report (jeden soubor na studii, přepisuje)
4. Shipment details (per zásilka CZ, vždy přepisuje)
5. Import do MongoDB (studie.iwrs_shipments / iwrs_shipment_items / iwrs_inventory / iwrs_destruction)
Spusť tento skript — zpracuje obě studie automaticky.
"""
import os
import glob
import re
import datetime
import sys
import pandas as pd
from playwright.sync_api import sync_playwright
import import_to_mongo as drugs_mongo
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
SITES = {
"77242113UCO3001": [
"DD5-CZ10001", "DD5-CZ10003", "DD5-CZ10006", "DD5-CZ10009",
"DD5-CZ10010", "DD5-CZ10012", "DD5-CZ10013", "DD5-CZ10015",
"DD5-CZ10016", "DD5-CZ10020", "DD5-CZ10021", "DD5-CZ10022",
],
"42847922MDD3003": [
"S10-CZ10002", "S10-CZ10004", "S10-CZ10005",
"S10-CZ10008", "S10-CZ10011", "S10-CZ10012",
],
}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ── login ────────────────────────────────────────────────────────────────────
def login(page, study):
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
# ── download funkce ──────────────────────────────────────────────────────────
def download_inventory(page, study):
out_dir = os.path.join(BASE_DIR, f"xls_reports_{study}")
os.makedirs(out_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/onsite_inventory_detail")
page.wait_for_load_state("networkidle", timeout=120000)
for site_id in SITES[study]:
print(f" [{site_id}] inventory...")
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.get_by_role("option", name=site_id).click()
page.wait_for_load_state("networkidle", timeout=120000)
filename = os.path.join(out_dir, f"onsite_inventory_detail_{site_id}.xlsx")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(f" Inventory OK ({len(SITES[study])} center)")
def download_destruction(page, study):
out_dir = os.path.join(BASE_DIR, f"xls_ip_destruction_{study}")
os.makedirs(out_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/ip_destruction_form")
page.wait_for_load_state("networkidle", timeout=120000)
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.wait_for_timeout(1000)
baskets = [b.strip() for b in page.locator("mat-option").all_inner_texts()
if b.strip() and b.strip() != "No results found"]
page.keyboard.press("Escape")
page.wait_for_timeout(500)
if not baskets:
print(" Žádné destruction košíky")
return
new_count = 0
for basket in baskets:
filename = os.path.join(out_dir, f"ip_destruction_basket_{basket}.xlsx")
if os.path.exists(filename):
continue # destrukce se nemění — přeskočit
print(f" [košík {basket}] stahování...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(basket)
page.wait_for_timeout(500)
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
new_count += 1
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(f" Destruction OK ({new_count} nových, {len(baskets) - new_count} přeskočeno)")
def download_shipments_report(page, study):
out_dir = os.path.join(BASE_DIR, f"xls_shipments_{study}")
os.makedirs(out_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/shipments_report")
page.wait_for_load_state("networkidle", timeout=120000)
filename = os.path.join(out_dir, f"shipments_report_{study}.xlsx")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" Shipments report OK")
def download_shipment_details(page, study):
out_dir = os.path.join(BASE_DIR, f"xls_shipment_details_{study}")
os.makedirs(out_dir, exist_ok=True)
# načti CZ shipment IDs z právě staženého shipments reportu
report_path = os.path.join(BASE_DIR, f"xls_shipments_{study}", f"shipments_report_{study}.xlsx")
raw = pd.read_excel(report_path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Shipment ID" in [str(v).strip() for v in row]:
header_row = i
break
df = pd.read_excel(report_path, header=header_row)
df = df.dropna(how="all")
df = df[df["Location"].astype(str).str.contains("Czech", na=False, case=False)]
cz_shipments = list(zip(
df["Shipment ID"].astype(str).str.strip(),
df["IRT Shipment Status"].astype(str).str.strip() if "IRT Shipment Status" in df.columns else [""] * len(df),
))
print(f" CZ zásilek ke stažení: {len(cz_shipments)}")
page.goto(f"{BASE_URL}/report/shipment_details_report")
page.wait_for_load_state("networkidle", timeout=120000)
skipped = 0
for shipment, status in cz_shipments:
filename = os.path.join(out_dir, f"shipment_details_{shipment}.xlsx")
if os.path.exists(filename) and status.upper() == "RECEIVED":
skipped += 1
continue # finální stav, soubor se nemění
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(shipment)
page.wait_for_timeout(500)
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{shipment}] ({status}) OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(f" Přeskočeno (RECEIVED): {skipped}")
# ── main ─────────────────────────────────────────────────────────────────────
def main():
os.chdir(BASE_DIR)
# ── Stahování ────────────────────────────────────────────────────────────
with sync_playwright() as p:
for study in STUDIES:
print(f"\n{'='*60}")
print(f"[{study}] STAHOVÁNÍ")
print(f"{'='*60}")
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
try:
print(" Přihlášení...")
login(page, study)
print("\n [1/4] Onsite inventory...")
download_inventory(page, study)
print("\n [2/4] IP destruction...")
download_destruction(page, study)
print("\n [3/4] Shipments report...")
download_shipments_report(page, study)
print("\n [4/4] Shipment details (CZ)...")
download_shipment_details(page, study)
except Exception as e:
import traceback
print(f" CHYBA při stahování: {e}")
traceback.print_exc()
finally:
browser.close()
# ── Import do MongoDB ─────────────────────────────────────────────────────
print(f"\n{'='*60}")
print("IMPORT DO MongoDB")
print(f"{'='*60}")
try:
drugs_mongo.run(STUDIES)
except Exception as e:
import traceback
print(f" CHYBA při importu: {e}")
traceback.print_exc()
print(f"\n{'='*60}")
print("Vše hotovo.")
print(f"{'='*60}")
main()