This commit is contained in:
2026-05-05 11:41:33 +02:00
parent 1f52ce4045
commit 5103cac2c9
271 changed files with 3525 additions and 0 deletions
+139
View File
@@ -0,0 +1,139 @@
import mysql.connector
import db_config
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME
)
c = conn.cursor()
# Přidat report_type do iwrs_import (pokud ještě neexistuje)
try:
c.execute("""ALTER TABLE iwrs_import
ADD COLUMN report_type VARCHAR(20) NOT NULL DEFAULT 'patients'
AFTER source_file""")
print("ALTER TABLE iwrs_import OK — report_type přidán")
except mysql.connector.errors.DatabaseError as e:
if "Duplicate column" in str(e):
print("report_type již existuje — přeskočeno")
else:
raise
stmts = [
(
"iwrs_shipments",
"""CREATE TABLE IF NOT EXISTS iwrs_shipments (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
study VARCHAR(20) NOT NULL,
shipment_id VARCHAR(20) NOT NULL,
status VARCHAR(50),
type VARCHAR(30),
ship_from VARCHAR(50),
ship_to_site VARCHAR(50),
location VARCHAR(50),
request_date DATE,
shipped_date DATE,
received_date DATE,
received_by VARCHAR(100),
delivered_date_utc DATE,
delivery_recipient VARCHAR(100),
delivery_details VARCHAR(200),
cancelled_date DATE,
total_medication_ids SMALLINT,
tracking_no VARCHAR(100),
shipping_category VARCHAR(50),
expected_arrival DATE,
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_study_shipment (study, shipment_id)
)"""
),
(
"iwrs_shipment_items",
"""CREATE TABLE IF NOT EXISTS iwrs_shipment_items (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
study VARCHAR(20) NOT NULL,
shipment_id VARCHAR(20) NOT NULL,
destination_location VARCHAR(50),
shipment_status VARCHAR(50),
shipment_type VARCHAR(30),
destination_site VARCHAR(50),
investigator VARCHAR(100),
medication_description VARCHAR(200),
medication_type VARCHAR(50),
medication_id VARCHAR(20),
packaged_lot_no VARCHAR(50),
packaged_lot_description VARCHAR(100),
container_id VARCHAR(50),
quantity SMALLINT,
expiration_date DATE,
item_status VARCHAR(50),
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_med_id (medication_id)
)"""
),
(
"iwrs_inventory",
"""CREATE TABLE IF NOT EXISTS iwrs_inventory (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
study VARCHAR(20) NOT NULL,
site VARCHAR(50),
investigator VARCHAR(100),
location VARCHAR(50),
medication_id VARCHAR(20),
packaged_lot_no VARCHAR(50),
original_expiration_date DATE,
expiration_date DATE,
received_date DATE,
receipt_user VARCHAR(100),
subject_identifier VARCHAR(20),
quantity_assigned SMALLINT,
irt_transaction VARCHAR(100),
date_assigned DATE,
assignment_user VARCHAR(100),
dispensation_status VARCHAR(50),
dispensing_date DATE,
quantity_dispensed SMALLINT,
dispensing_user VARCHAR(100),
quantity_returned SMALLINT,
date_returned DATE,
return_user VARCHAR(100),
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_site (study, site)
)"""
),
(
"iwrs_destruction",
"""CREATE TABLE IF NOT EXISTS iwrs_destruction (
id INT AUTO_INCREMENT PRIMARY KEY,
study VARCHAR(20) NOT NULL,
site_id VARCHAR(50),
investigator VARCHAR(100),
location VARCHAR(50),
basket_id VARCHAR(20) NOT NULL,
destruction_date DATE,
medication_description VARCHAR(200),
medication_id VARCHAR(20),
packaged_lot_description VARCHAR(100),
comments VARCHAR(500),
imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_destruction (study, basket_id, medication_id),
INDEX idx_study_basket (study, basket_id)
)"""
),
]
for name, sql in stmts:
c.execute(sql)
print(f"OK: {name}")
conn.commit()
c.close()
conn.close()
print("\nVšechny tabulky připraveny.")
@@ -0,0 +1,368 @@
import pandas as pd
from datetime import date
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
STUDY = "42847922MDD3003"
#STUDY = "77242113UCO3001"
INVENTORY_DIR = Path(f"xls_reports_{STUDY}")
DESTRUCTION_DIR = Path(f"xls_ip_destruction_{STUDY}")
SHIPMENTS_FILE = Path(f"xls_shipments_{STUDY}/shipments_report_{STUDY}.xlsx")
DETAILS_DIR = Path(f"xls_shipment_details_{STUDY}")
OUTPUT_DIR = Path("output")
OUTPUT_FILE = OUTPUT_DIR / f"{date.today().strftime('%Y-%m-%d')} {STUDY} CZ IWRS overview.xlsx"
SHIPMENT_DROP_COLS = {
"Location", "Shipped Date", "Delivered Date [UTC]",
"Delivery Recipient", "Delivery Details", "Cancelled Date",
"Tracking #", "Total Medication IDs",
"Shipping Category", "Study", "Destination Location", "Destination Site",
"Medication type", "Container ID", "Quantity of Medication IDs",
"Packaged Lot description",
}
# ── Shared constants ──────────────────────────────────────────────────────────
COLUMN_RENAMES = {
"Site": "Site",
"Medication ID": "Med ID",
"Packaged Lot number": "Lot No.",
"Original Expiration Date when Packaged Lot was Added": "Orig Exp Date",
"Expiration date": "Exp Date",
"Received Date": "Rcv Date",
"Shipment Receipt User": "Rcpt User",
"Subject Identifier": "Subject ID",
"Quantity Assigned": "Qty Asgn",
"IRT Transaction": "IRT Tx",
"Date Assigned": "Date Asgn",
"Assignment User": "Asgn User",
"Dispensation Status": "Disp Status",
"Dispensing Date": "Disp Date",
"Dispensing date": "Disp Date",
"Quantity Dispensed": "Qty Disp",
"Dispensing User": "Disp User",
"Quantity Returned": "Qty Ret",
"Date Returned": "Date Ret",
"Return User": "Ret User",
"DestroyedOn": "Destroyed",
"Basket number": "Basket No.",
}
DATE_COLUMNS = {
"Orig Exp Date", "Exp Date", "Rcv Date",
"Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date",
}
COLUMN_WIDTHS = {
"Site": 14,
"Med ID": 10,
"Lot No.": 12,
"Orig Exp Date": 16,
"Exp Date": 14,
"Rcv Date": 14,
"Rcpt User": 22,
"Subject ID": 14,
"Qty Asgn": 9,
"IRT Tx": 8,
"Date Asgn": 14,
"Asgn User": 20,
"Disp Status": 16,
"Disp Date": 14,
"Qty Disp": 9,
"Disp User": 20,
"Qty Ret": 10,
"Date Ret": 14,
"Ret User": 18,
"Destroyed": 14,
"Basket No.": 12,
"Max Visit Date": 16,
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def read_inventory(path):
df = pd.read_excel(path, header=None)
# Support both "Medication ID" (MDD3003) and "Medication" (UCO3001)
mask = df[0].isin(["Medication ID", "Medication"])
meta = {}
for i in range(len(df)):
val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else ""
if val.startswith("Site:"):
meta["site"] = val.replace("Site:", "").strip()
if not mask.any():
print(f" {path.name}: no data (skipping)")
return None, meta
header_row = df[mask].index[0]
data = pd.read_excel(path, header=header_row)
data = data.rename(columns={"Medication": "Medication ID"})
return data, meta
def read_destruction_lookup():
lookup = {}
for path in DESTRUCTION_DIR.glob("*.xlsx"):
df = pd.read_excel(path, header=None)
basket_id = None
destroyed_on = None
for i in range(15):
val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else ""
if val.startswith("Basket ID:"):
basket_id = val.replace("Basket ID:", "").strip()
if val.startswith("Drug Destruction Created Date:"):
destroyed_on = val.replace("Drug Destruction Created Date:", "").strip()
header_row = df[df[0] == "Medication ID Description"].index[0]
data = pd.read_excel(path, header=header_row)
for med_id in data["Medication ID"].dropna():
lookup[int(med_id)] = (basket_id, destroyed_on)
return lookup
def format_sheet(ws, header_color, highlight_col=None, highlight_color=None):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
header_fill = PatternFill("solid", start_color=header_color)
header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10)
row_font = Font(name="Arial", size=10)
hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None
headers = [cell.value for cell in ws[1]]
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
cell.border = border
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
col_name = headers[cell.column - 1] if cell.column <= len(headers) else None
cell.font = row_font
cell.border = border
cell.alignment = Alignment(horizontal="center")
if col_name in DATE_COLUMNS:
cell.number_format = "DD-MMM-YYYY"
if hi_fill and col_name == highlight_col:
cell.fill = hi_fill
for cell in ws[1]:
width = COLUMN_WIDTHS.get(cell.value, 14)
ws.column_dimensions[get_column_letter(cell.column)].width = width
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Shipment helpers ─────────────────────────────────────────────────────────
def build_shipments():
sh = pd.read_excel(SHIPMENTS_FILE, sheet_name=0, header=5)
sh.columns = sh.columns.str.strip()
sh = sh.dropna(how="all")
sh["Shipment ID"] = sh["Shipment ID"].astype(str).str.strip()
sh = sh.drop(columns=[c for c in SHIPMENT_DROP_COLS if c in sh.columns])
shipment_cols = list(sh.columns)
all_rows = []
for _, s_row in sh.iterrows():
sid = s_row["Shipment ID"]
path = DETAILS_DIR / f"shipment_details_{sid}.xlsx"
if not path.exists():
continue
det = pd.read_excel(path, sheet_name=0, header=5)
det.columns = det.columns.str.strip()
det = det.dropna(how="all")
det["Shipment"] = det["Shipment"].astype(str).str.strip()
extra_cols = [c for c in det.columns if c not in shipment_cols and c != "Shipment" and c not in SHIPMENT_DROP_COLS]
for _, d_row in det.iterrows():
all_rows.append({**s_row.to_dict(), **{c: d_row[c] for c in extra_cols}})
result = pd.DataFrame(all_rows)
all_cols = shipment_cols + [c for c in extra_cols if c in result.columns]
result = result[all_cols]
for col in ["Request Date", "Received Date", "Expiration Date"]:
if col in result.columns:
result[col] = pd.to_datetime(result[col], errors="coerce")
print(f" Shipments: {result['Shipment ID'].nunique()} shipments, {len(result)} kitu")
return result
def build_site_summary(result):
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = result.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = pivot[STATUS_COLS].reset_index().rename(columns={
"Ship To:": "Site", "Returned by Subject": "Returned"
})
pivot = pivot.sort_values("Site").reset_index(drop=True)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
print(f" Site Summary: {len(pivot)} center")
return pivot
def format_shipment_sheet(ws, header_color_ship, header_color_detail, n_ship_cols):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
hfont = Font(bold=True, color="FFFFFF", name="Arial", size=10)
dfont = Font(name="Arial", size=10)
fill_ship = PatternFill("solid", start_color=header_color_ship)
fill_detail = PatternFill("solid", start_color=header_color_detail)
for cell in ws[1]:
cell.fill = fill_ship if cell.column <= n_ship_cols else fill_detail
cell.font = hfont
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = border
ws.column_dimensions[get_column_letter(cell.column)].width = min(len(str(cell.value or "")) + 4, 35)
ws.row_dimensions[1].height = 30
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
cell.font = dfont
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center")
if cell.value.__class__.__name__ in ("datetime", "date", "Timestamp"):
cell.number_format = "DD-MMM-YYYY"
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Build DataFrames ──────────────────────────────────────────────────────────
def build_main(lookup):
all_rows = []
for path in sorted(INVENTORY_DIR.glob("onsite_inventory_detail_*.xlsx")):
df, meta = read_inventory(path)
if df is None:
continue
df["DestroyedOn"] = df["Medication ID"].apply(
lambda x: lookup.get(int(x), (None, None))[1] if pd.notna(x) else None)
df["Basket number"] = df["Medication ID"].apply(
lambda x: lookup.get(int(x), (None, None))[0] if pd.notna(x) else None)
df.insert(0, "Site", meta.get("site", path.stem))
all_rows.append(df)
print(f" {path.name}: {len(df)} kits")
combined = pd.concat(all_rows, ignore_index=True)
combined.rename(columns=COLUMN_RENAMES, inplace=True)
for col in DATE_COLUMNS:
if col in combined.columns:
combined[col] = pd.to_datetime(combined[col], dayfirst=True, errors="coerce")
combined.sort_values(["Site", "Rcv Date", "Med ID"], inplace=True, ignore_index=True)
return combined
def build_expired(df):
today = date.today()
mask = (
df["Basket No."].isna() &
df["Subject ID"].isna() &
(df["Exp Date"] < pd.Timestamp(today))
)
filtered = df[mask].copy().reset_index(drop=True)
sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}"
print(f" Expired: {len(filtered)}")
return filtered, sheet_name
def build_assigned_not_dispensed(df):
mask = df["Subject ID"].notna() & df["Disp Date"].isna()
filtered = df[mask].copy().reset_index(drop=True)
print(f" Assigned not dispensed: {len(filtered)}")
return filtered
def build_not_returned(df):
no_ret = df[
df["Date Ret"].isna() &
df["Subject ID"].notna() &
(df["Disp Status"].str.upper() != "NOT DISPENSED")
].copy()
max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date")
no_ret = no_ret.join(max_asgn, on="Subject ID")
filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy()
filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."])
filtered = filtered.reset_index(drop=True)
print(f" Not returned: {len(filtered)}")
return filtered
def build_kits_for_destruction(df):
mask = (
df["Basket No."].isna() &
(df["Date Ret"].notna() | (df["Disp Status"].str.upper() == "NOT DISPENSED"))
)
filtered = df[mask].copy().sort_values(["Site", "Date Ret"], ascending=[True, True])
filtered = filtered.drop(columns=["Destroyed", "Basket No."]).reset_index(drop=True)
print(f" Kits for destruction: {len(filtered)}")
return filtered
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
# Prepare output dir, remove any previous overview file
OUTPUT_DIR.mkdir(exist_ok=True)
for old in OUTPUT_DIR.glob(f"*{STUDY} CZ IWRS overview.xlsx"):
old.unlink()
print(f"Removed old file: {old.name}")
lookup = read_destruction_lookup()
print(f"Loaded {len(lookup)} kits from destruction reports")
df = build_main(lookup)
expired_df, expired_sheet = build_expired(df)
assigned_df = build_assigned_not_dispensed(df)
not_returned_df = build_not_returned(df)
destruction_df = build_kits_for_destruction(df)
shipments_df = build_shipments()
site_summary_df = build_site_summary(shipments_df)
n_ship_cols = shipments_df.columns.tolist().index("Investigator") # first detail col index (0-based)
# Write all sheets
with pd.ExcelWriter(OUTPUT_FILE, engine="openpyxl") as writer:
df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview")
expired_df.to_excel( writer, index=False, sheet_name=expired_sheet)
assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed")
not_returned_df.to_excel( writer, index=False, sheet_name="Not returned")
destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction")
shipments_df.to_excel( writer, index=False, sheet_name="Shipments")
site_summary_df.to_excel( writer, index=False, sheet_name="Site Summary")
# Format all sheets
wb = load_workbook(OUTPUT_FILE)
# Main sheet — dark blue, green highlight for Destroyed/Basket No.
ws_main = wb["CountryMedicationOverview"]
format_sheet(ws_main, header_color="1F4E79")
# Extra: green fill for Destroyed and Basket No. columns
new_col_fill = PatternFill("solid", start_color="E2EFDA")
headers_main = [c.value for c in ws_main[1]]
for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row):
for cell in row:
col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None
if col_name in ("Destroyed", "Basket No."):
cell.fill = new_col_fill
format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0")
format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC")
format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA")
format_sheet(wb["Kits for destruction"], header_color="595959")
format_shipment_sheet(wb["Shipments"], "1F4E79", "375623", n_ship_cols)
format_sheet(wb["Site Summary"], header_color="1F4E79")
wb.save(OUTPUT_FILE)
print(f"\nSaved: {OUTPUT_FILE} ({len(df)} rows on main sheet, {wb.sheetnames})")
if __name__ == "__main__":
main()
@@ -0,0 +1,163 @@
import pandas as pd
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from datetime import date
import os
STUDY = "77242113UCO3001"
SHIPMENTS_FILE = f"xls_shipments_{STUDY}/shipments_report_{STUDY}.xlsx"
DETAILS_DIR = f"xls_shipment_details_{STUDY}"
OUTPUT_DIR = "output"
TEST_SHIPMENT = None # None = vsechny shipments
DROP_COLS = {
"Location", "Shipped Date", "Delivered Date [UTC]",
"Delivery Recipient", "Delivery Details", "Cancelled Date",
"Tracking #", "Total Medication IDs",
"Shipping Category", "Study", "Destination Location", "Destination Site",
"Medication type", "Container ID", "Quantity of Medication IDs",
"Packaged Lot description",
}
os.makedirs(OUTPUT_DIR, exist_ok=True)
def read_shipments():
df = pd.read_excel(SHIPMENTS_FILE, sheet_name=0, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment ID"] = df["Shipment ID"].astype(str).str.strip()
df = df.drop(columns=[c for c in DROP_COLS if c in df.columns])
return df
def read_details(shipment_id):
path = os.path.join(DETAILS_DIR, f"shipment_details_{shipment_id}.xlsx")
if not os.path.exists(path):
return None
df = pd.read_excel(path, sheet_name=0, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment"] = df["Shipment"].astype(str).str.strip()
return df
def build_report():
shipments = read_shipments()
if TEST_SHIPMENT:
shipments = shipments[shipments["Shipment ID"] == TEST_SHIPMENT]
shipment_cols = list(shipments.columns)
all_rows = []
for _, s_row in shipments.iterrows():
sid = s_row["Shipment ID"]
details = read_details(sid)
if details is None:
continue
extra_cols = [c for c in details.columns if c not in shipment_cols and c != "Shipment" and c not in DROP_COLS]
for _, d_row in details.iterrows():
row = {**s_row.to_dict(), **{c: d_row[c] for c in extra_cols}}
all_rows.append(row)
print(f" [{sid}] {len(details)} kitu")
result = pd.DataFrame(all_rows)
all_cols = shipment_cols + [c for c in extra_cols if c in result.columns]
result = result[all_cols]
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Shipments"
HEADER_FILL_SHIP = PatternFill("solid", fgColor="1F4E79")
HEADER_FILL_DETAIL = PatternFill("solid", fgColor="375623")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
DATA_FONT = Font(name="Arial", size=10)
BORDER = Border(
left=Side(style="thin", color="BFBFBF"),
right=Side(style="thin", color="BFBFBF"),
bottom=Side(style="thin", color="BFBFBF"),
)
n_ship = len(shipment_cols)
for ci, col in enumerate(all_cols, 1):
cell = ws.cell(row=1, column=ci, value=col)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL_SHIP if ci <= n_ship else HEADER_FILL_DETAIL
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = BORDER
ws.row_dimensions[1].height = 30
for ri, (_, row) in enumerate(result.iterrows(), 2):
for ci, col in enumerate(all_cols, 1):
val = row[col]
if pd.isna(val):
val = None
elif hasattr(val, "date"):
val = val.date()
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
if isinstance(val, date):
cell.number_format = "DD-MMM-YYYY"
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
for ci, col in enumerate(all_cols, 1):
vals = [col] + [str(result.iloc[r][col]) for r in range(len(result)) if pd.notna(result.iloc[r][col])]
ws.column_dimensions[get_column_letter(ci)].width = min(max((len(v) for v in vals), default=10) + 2, 35)
# --- Sheet 2: Site Summary ---
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = result.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = pivot[STATUS_COLS].reset_index().rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
pivot = pivot.sort_values("Site").reset_index(drop=True)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
ws2 = wb.create_sheet("Site Summary")
summary_cols = ["Site", "Available", "Assigned", "Dispensed", "Returned", "Total"]
HEADER_FILL_SUMM = PatternFill("solid", fgColor="1F4E79")
for ci, col in enumerate(summary_cols, 1):
cell = ws2.cell(row=1, column=ci, value=col)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL_SUMM
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = BORDER
ws2.row_dimensions[1].height = 25
for ri, (_, row) in enumerate(pivot.iterrows(), 2):
for ci, col in enumerate(summary_cols, 1):
cell = ws2.cell(row=ri, column=ci, value=row[col])
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
for ci, col in enumerate(summary_cols, 1):
vals = [col] + [str(pivot.iloc[r][col]) for r in range(len(pivot))]
ws2.column_dimensions[get_column_letter(ci)].width = min(max(len(v) for v in vals) + 4, 35)
ws2.freeze_panes = "A2"
suffix = f"_{TEST_SHIPMENT}" if TEST_SHIPMENT else ""
pattern = f"{STUDY} CZ Shipments{suffix}.xlsx"
for old in os.listdir(OUTPUT_DIR):
if old.endswith(pattern):
try:
os.remove(os.path.join(OUTPUT_DIR, old))
print(f"Smazan -> {old}")
except OSError:
print(f"Preskakuji smazani (soubor otevren?) -> {old}")
outfile = os.path.join(OUTPUT_DIR, f"{date.today()} {STUDY} CZ Shipments{suffix}.xlsx")
wb.save(outfile)
print(f"\nUlozeno -> {outfile}")
build_report()
@@ -0,0 +1,76 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_ip_destruction_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_ip_destruction_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/ip_destruction_form")
page.wait_for_load_state("networkidle", timeout=120000)
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.wait_for_timeout(1000)
baskets = [b.strip() for b in page.locator('mat-option').all_inner_texts()
if b.strip() and b.strip() != "No results found"]
print(f" Nalezeno {len(baskets)} kosiku: {baskets}")
page.keyboard.press("Escape")
page.wait_for_timeout(500)
if not baskets:
print(" Zadne destruction kosite — preskakuji.")
return
for basket in baskets:
filename = os.path.join(output_dir, f"ip_destruction_basket_{basket}.xlsx")
if os.path.exists(filename):
print(f" [{basket}] Preskakuji — existuje.")
continue
print(f" [{basket}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(basket)
page.wait_for_timeout(500)
page.locator('mat-option').first.dispatch_event('click')
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{basket}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Destruction hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
+83
View File
@@ -0,0 +1,83 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
SITES = {
"42847922MDD3003": [
"S10-CZ10002",
"S10-CZ10004",
"S10-CZ10005",
"S10-CZ10008",
"S10-CZ10011",
"S10-CZ10012",
],
"77242113UCO3001": [
"DD5-CZ10001",
"DD5-CZ10003",
"DD5-CZ10006",
"DD5-CZ10009",
"DD5-CZ10010",
"DD5-CZ10012",
"DD5-CZ10013",
"DD5-CZ10015",
"DD5-CZ10016",
"DD5-CZ10020",
"DD5-CZ10021",
"DD5-CZ10022",
],
}
OUTPUT_DIR = f"xls_reports_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_reports_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/onsite_inventory_detail")
page.wait_for_load_state("networkidle", timeout=120000)
for site_id in SITES[study]:
print(f" [{site_id}] Stahuji...")
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.get_by_role("option", name=site_id).click()
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(os.path.join(output_dir, f"onsite_inventory_detail_{site_id}.xlsx"))
print(f" [{site_id}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Inventory hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
@@ -0,0 +1,95 @@
from playwright.sync_api import sync_playwright
import os
import pandas as pd
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDY = "42847922MDD3003"
#STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_shipment_details_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def get_cz_shipment_ids(study):
path = f"xls_shipments_{study}/shipments_report_{study}.xlsx"
if not os.path.exists(path):
return None
df = pd.read_excel(path, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment ID"] = df["Shipment ID"].astype(str).str.strip()
cz = df[df["Location"].str.contains("Czech", na=False, case=False)]
return cz["Shipment ID"].tolist()
def run(page, study):
output_dir = f"xls_shipment_details_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/shipment_details_report")
page.wait_for_load_state("networkidle", timeout=120000)
cz_ids = get_cz_shipment_ids(study)
if cz_ids is not None:
shipments = cz_ids
print(f" Filtrovano ze shipments reportu: {len(shipments)} CZ shipmentu")
else:
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.wait_for_timeout(1000)
shipments = [s.strip() for s in page.locator('mat-option').all_inner_texts()
if s.strip() and s.strip() != "No results found"]
print(f" Nalezeno {len(shipments)} shipmentu z dropdownu")
page.keyboard.press("Escape")
page.wait_for_timeout(500)
if not shipments:
print(" Zadne shipments — preskakuji.")
return
for shipment in shipments:
filename = os.path.join(output_dir, f"shipment_details_{shipment}.xlsx")
if os.path.exists(filename):
print(f" [{shipment}] Preskakuji — existuje.")
continue
print(f" [{shipment}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(shipment)
page.wait_for_timeout(500)
page.locator('mat-option').first.dispatch_event('click')
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{shipment}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Shipment details hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
@@ -0,0 +1,47 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_shipments_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_shipments_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/shipments_report")
page.wait_for_load_state("networkidle", timeout=120000)
filename = os.path.join(output_dir, f"shipments_report_{study}.xlsx")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" Shipments report OK -> {filename}")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
+441
View File
@@ -0,0 +1,441 @@
"""
Importuje drugs data z IWRS Excel reportů do MySQL.
Tabulky:
iwrs_shipments — zásilky (jen CZ, verzováno import_id)
iwrs_shipment_items — obsah zásilek (verzováno import_id)
iwrs_inventory — lékový sklad na centrech (verzováno import_id)
iwrs_destruction — destrukce (bez verzování, přeskočí již importované košíky)
Spustit po stažení souborů (nebo přes run_all.py).
"""
import os
import glob
import re
import datetime
import numpy as np
import pandas as pd
import mysql.connector
import db_config
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
SITES = {
"77242113UCO3001": [
"DD5-CZ10001", "DD5-CZ10003", "DD5-CZ10006", "DD5-CZ10009",
"DD5-CZ10010", "DD5-CZ10012", "DD5-CZ10013", "DD5-CZ10015",
"DD5-CZ10016", "DD5-CZ10020", "DD5-CZ10021", "DD5-CZ10022",
],
"42847922MDD3003": [
"S10-CZ10002", "S10-CZ10004", "S10-CZ10005",
"S10-CZ10008", "S10-CZ10011", "S10-CZ10012",
],
}
# ── type converters ──────────────────────────────────────────────────────────
def _py(val):
if isinstance(val, np.generic):
return val.item()
return val
def to_date(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
try:
if pd.isna(val):
return None
except (TypeError, ValueError):
pass
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
def to_int(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else int(v)
except (TypeError, ValueError):
return None
def to_str(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
# ── DB helpers ───────────────────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def insert_import(cursor, study, source_label):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file, report_type) VALUES (%s, %s, %s, %s)",
(study, datetime.datetime.now(), source_label, "drugs"),
)
return cursor.lastrowid
def basket_already_imported(cursor, study, basket_id):
cursor.execute(
"SELECT 1 FROM iwrs_destruction WHERE study=%s AND basket_id=%s LIMIT 1",
(study, str(basket_id)),
)
return cursor.fetchone() is not None
# ── parsers ──────────────────────────────────────────────────────────────────
def parse_shipments_report(study):
path = os.path.join(BASE_DIR, f"xls_shipments_{study}", f"shipments_report_{study}.xlsx")
if not os.path.exists(path):
print(f" CHYBÍ: {path}")
return []
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Shipment ID" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
df = pd.read_excel(path, header=header_row)
df = df.dropna(how="all")
# pouze CZ zásilky
df = df[df["Location"].astype(str).str.contains("Czech", na=False, case=False)]
col = df.columns.tolist()
rows = []
for _, r in df.iterrows():
rows.append({
"shipment_id": to_str(r["Shipment ID"]),
"status": to_str(r["IRT Shipment Status"]),
"type": to_str(r["Type"]),
"ship_from": to_str(r["Shipment From"]),
"ship_to_site": to_str(r["Ship To:"]),
"location": to_str(r["Location"]),
"request_date": to_date(r["Request Date"]),
"shipped_date": to_date(r["Shipped Date"]),
"received_date": to_date(r["Received Date"]) if "Received Date" in col else None,
"received_by": to_str(r["Received by"]) if "Received by" in col else None,
"delivered_date_utc": to_date(r["Delivered Date [UTC]"]) if "Delivered Date [UTC]" in col else None,
"delivery_recipient": to_str(r["Delivery Recipient"]) if "Delivery Recipient" in col else None,
"delivery_details": to_str(r["Delivery Details"]) if "Delivery Details" in col else None,
"cancelled_date": to_date(r["Cancelled Date"]) if "Cancelled Date" in col else None,
"total_medication_ids": to_int(r["Total Medication IDs"]) if "Total Medication IDs" in col else None,
"tracking_no": to_str(r["Tracking #"]) if "Tracking #" in col else None,
"shipping_category": to_str(r["Shipping Category"]) if "Shipping Category" in col else None,
"expected_arrival": to_date(r["Expected Arrival"]) if "Expected Arrival" in col else None,
})
return rows
def parse_shipment_details(study):
detail_dir = os.path.join(BASE_DIR, f"xls_shipment_details_{study}")
files = sorted(glob.glob(os.path.join(detail_dir, "shipment_details_*.xlsx")))
rows = []
for path in files:
# shipment ID z názvu souboru
m = re.search(r"shipment_details_(.+)\.xlsx", os.path.basename(path))
shipment_id = m.group(1) if m else "UNKNOWN"
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Medication ID" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
continue
df = pd.read_excel(path, header=header_row)
df = df.dropna(how="all")
col = df.columns.tolist()
for _, r in df.iterrows():
# normalizace názvů sloupců lišících se mezi studiemi
med_desc = (to_str(r.get("Medication Description"))
or to_str(r.get("Medication ID Description")))
med_type = (to_str(r.get("Medication type"))
or to_str(r.get("Medication ID type")))
rows.append({
"shipment_id": shipment_id,
"destination_location": to_str(r.get("Destination Location")),
"shipment_status": to_str(r.get("IRT Shipment Status")),
"shipment_type": to_str(r.get("Type")),
"destination_site": to_str(r.get("Destination Site")),
"investigator": to_str(r.get("Investigator")),
"medication_description": med_desc,
"medication_type": med_type,
"medication_id": to_str(r.get("Medication ID")),
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
"container_id": to_str(r.get("Container ID")),
"quantity": to_int(r.get("Quantity of Medication IDs")),
"expiration_date": to_date(r.get("Expiration Date")),
"item_status": to_str(r.get("Status")),
})
return rows
def parse_inventory(study):
inv_dir = os.path.join(BASE_DIR, f"xls_reports_{study}")
files = sorted(glob.glob(os.path.join(inv_dir, "onsite_inventory_detail_*.xlsx")))
rows = []
for path in files:
raw = pd.read_excel(path, header=None)
# extrahuj metadata ze záhlaví
site = investigator = location = None
header_row = None
for i, row in raw.iterrows():
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
if first.startswith("Site:"):
site = first.replace("Site:", "").strip()
elif first.startswith("Investigator:"):
investigator = first.replace("Investigator:", "").strip()
elif first.startswith("Location:"):
location = first.replace("Location:", "").strip()
# hlavička dat — první sloupec je "Medication" nebo "Medication ID"
if first in ("Medication", "Medication ID") and header_row is None:
header_row = i
if header_row is None:
continue
df = pd.read_excel(path, header=header_row)
df = df.dropna(how="all")
# normalizuj první sloupec na "medication_id"
df = df.rename(columns={df.columns[0]: "medication_id"})
col = df.columns.tolist()
for _, r in df.iterrows():
rows.append({
"site": site,
"investigator": investigator,
"location": location,
"medication_id": to_str(r["medication_id"]),
"packaged_lot_no": to_str(r.get("Packaged Lot number")),
"original_expiration_date": to_date(r.get("Original Expiration Date when Packaged Lot was Added")),
"expiration_date": to_date(r.get("Expiration date")),
"received_date": to_date(r.get("Received Date")),
"receipt_user": to_str(r.get("Shipment Receipt User")),
"subject_identifier": to_str(r.get("Subject Identifier")),
"quantity_assigned": to_int(r.get("Quantity Assigned")),
"irt_transaction": to_str(r.get("IRT Transaction")),
"date_assigned": to_date(r.get("Date Assigned")),
"assignment_user": to_str(r.get("Assignment User")),
"dispensation_status": to_str(r.get("Dispensation Status")),
"dispensing_date": to_date(r.get("Dispensing date") or r.get("Dispensing Date")),
"quantity_dispensed": to_int(r.get("Quantity Dispensed")),
"dispensing_user": to_str(r.get("Dispensing User")),
"quantity_returned": to_int(r.get("Quantity Returned")),
"date_returned": to_date(r.get("Date Returned")),
"return_user": to_str(r.get("Return User")),
})
return rows
def parse_destruction_files(study):
dest_dir = os.path.join(BASE_DIR, f"xls_ip_destruction_{study}")
files = sorted(glob.glob(os.path.join(dest_dir, "ip_destruction_basket_*.xlsx")))
baskets = []
for path in files:
raw = pd.read_excel(path, header=None)
# metadata z záhlaví
meta = {}
header_row = None
for i, row in raw.iterrows():
first = str(row.iloc[0]).strip() if pd.notna(row.iloc[0]) else ""
for key, attr in [
("Investigator Name:", "investigator"),
("Site ID:", "site_id"),
("Location:", "location"),
("Basket ID:", "basket_id"),
("Drug Destruction Created Date:", "destruction_date"),
]:
if first.startswith(key):
meta[attr] = first.replace(key, "").strip()
if first == "Medication ID Description" and header_row is None:
header_row = i
if header_row is None:
continue
df = pd.read_excel(path, header=header_row)
df = df.dropna(how="all")
items = []
for _, r in df.iterrows():
items.append({
"medication_description": to_str(r.get("Medication ID Description")),
"medication_id": to_str(r.get("Medication ID")),
"packaged_lot_description": to_str(r.get("Packaged Lot description")),
"comments": to_str(r.get("Comments")),
})
baskets.append({
"site_id": meta.get("site_id"),
"investigator": meta.get("investigator"),
"location": meta.get("location"),
"basket_id": meta.get("basket_id"),
"destruction_date": to_date(meta.get("destruction_date")),
"items": items,
})
return baskets
# ── inserters ────────────────────────────────────────────────────────────────
def insert_shipments(cursor, import_id, study, rows):
sql = """INSERT INTO iwrs_shipments
(import_id, study, shipment_id, status, type, ship_from, ship_to_site,
location, request_date, shipped_date, received_date, received_by,
delivered_date_utc, delivery_recipient, delivery_details, cancelled_date,
total_medication_ids, tracking_no, shipping_category, expected_arrival)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for r in rows:
cursor.execute(sql, (
import_id, study, r["shipment_id"], r["status"], r["type"],
r["ship_from"], r["ship_to_site"], r["location"],
r["request_date"], r["shipped_date"], r["received_date"],
r["received_by"], r["delivered_date_utc"], r["delivery_recipient"],
r["delivery_details"], r["cancelled_date"], r["total_medication_ids"],
r["tracking_no"], r["shipping_category"], r["expected_arrival"],
))
def insert_shipment_items(cursor, import_id, study, rows):
sql = """INSERT INTO iwrs_shipment_items
(import_id, study, shipment_id, destination_location, shipment_status,
shipment_type, destination_site, investigator, medication_description,
medication_type, medication_id, packaged_lot_no, packaged_lot_description,
container_id, quantity, expiration_date, item_status)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for r in rows:
cursor.execute(sql, (
import_id, study, r["shipment_id"], r["destination_location"],
r["shipment_status"], r["shipment_type"], r["destination_site"],
r["investigator"], r["medication_description"], r["medication_type"],
r["medication_id"], r["packaged_lot_no"], r["packaged_lot_description"],
r["container_id"], r["quantity"], r["expiration_date"], r["item_status"],
))
def insert_inventory(cursor, import_id, study, rows):
sql = """INSERT INTO iwrs_inventory
(import_id, study, site, investigator, location, medication_id,
packaged_lot_no, original_expiration_date, expiration_date, received_date,
receipt_user, subject_identifier, quantity_assigned, irt_transaction,
date_assigned, assignment_user, dispensation_status, dispensing_date,
quantity_dispensed, dispensing_user, quantity_returned, date_returned, return_user)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for r in rows:
cursor.execute(sql, (
import_id, study, r["site"], r["investigator"], r["location"],
r["medication_id"], r["packaged_lot_no"], r["original_expiration_date"],
r["expiration_date"], r["received_date"], r["receipt_user"],
r["subject_identifier"], r["quantity_assigned"], r["irt_transaction"],
r["date_assigned"], r["assignment_user"], r["dispensation_status"],
r["dispensing_date"], r["quantity_dispensed"], r["dispensing_user"],
r["quantity_returned"], r["date_returned"], r["return_user"],
))
def insert_destruction(cursor, study, baskets):
sql = """INSERT IGNORE INTO iwrs_destruction
(study, site_id, investigator, location, basket_id, destruction_date,
medication_description, medication_id, packaged_lot_description, comments)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
skipped = 0
imported = 0
for b in baskets:
if basket_already_imported(cursor, study, b["basket_id"]):
skipped += 1
continue
for item in b["items"]:
cursor.execute(sql, (
study, b["site_id"], b["investigator"], b["location"],
b["basket_id"], b["destruction_date"],
item["medication_description"], item["medication_id"],
item["packaged_lot_description"], item["comments"],
))
imported += 1
return imported, skipped
# ── main ─────────────────────────────────────────────────────────────────────
def import_study(study):
print(f"\n Parsování dat pro {study}...")
shipments = parse_shipments_report(study)
items = parse_shipment_details(study)
inventory = parse_inventory(study)
baskets = parse_destruction_files(study)
print(f" Zásilky: {len(shipments)} | Položky zásilek: {len(items)} | Sklad: {len(inventory)} | Destrukční košíky: {len(baskets)}")
conn = get_conn()
cursor = conn.cursor()
import_id = insert_import(cursor, study, f"drugs_{study}")
print(f" import_id = {import_id}")
insert_shipments(cursor, import_id, study, shipments)
insert_shipment_items(cursor, import_id, study, items)
insert_inventory(cursor, import_id, study, inventory)
dest_imported, dest_skipped = insert_destruction(cursor, study, baskets)
conn.commit()
cursor.close()
conn.close()
print(f" Destrukce: {dest_imported} nových | {dest_skipped} košíků přeskočeno (již importováno)")
def main():
for study in STUDIES:
print(f"\n{'='*60}")
print(f"[{study}]")
print(f"{'='*60}")
try:
import_study(study)
print(f" OK")
except Exception as e:
import traceback
print(f" CHYBA: {e}")
traceback.print_exc()
print("\nHotovo.")
main()
+85
View File
@@ -0,0 +1,85 @@
import sys
import os
from playwright.sync_api import sync_playwright
import download_reports
import download_ip_destruction
import download_shipments_report
import download_shipment_details
import create_accountability_report
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = {
"1": "77242113UCO3001",
"2": "42847922MDD3003",
}
def pick_study():
print("Vyber studii:")
for k, v in STUDIES.items():
print(f" {k}) {v}")
while True:
choice = input("Volba (1/2): ").strip()
if choice in STUDIES:
return STUDIES[choice]
print(" Neplatna volba, zkus znovu.")
def login_and_select_study(page, study):
print(f"\n[1/5] Prihlaseni a vyber studie {study}...")
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
print(" OK")
def main():
os.chdir(os.path.dirname(os.path.abspath(__file__)))
study = pick_study()
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
login_and_select_study(page, study)
print(f"\n[2/5] Stahuji inventory reporty...")
download_reports.run(page, study)
print(f"\n[3/5] Stahuji IP destruction reporty...")
download_ip_destruction.run(page, study)
print(f"\n[4/5] Stahuji shipments report...")
download_shipments_report.run(page, study)
print(f"\n[5/5] Stahuji shipment details...")
download_shipment_details.run(page, study)
browser.close()
print(f"\n[6/6] Generuji accountability report...")
create_accountability_report.STUDY = study
create_accountability_report.INVENTORY_DIR = __import__("pathlib").Path(f"xls_reports_{study}")
create_accountability_report.DESTRUCTION_DIR= __import__("pathlib").Path(f"xls_ip_destruction_{study}")
create_accountability_report.SHIPMENTS_FILE = __import__("pathlib").Path(f"xls_shipments_{study}/shipments_report_{study}.xlsx")
create_accountability_report.DETAILS_DIR = __import__("pathlib").Path(f"xls_shipment_details_{study}")
create_accountability_report.OUTPUT_FILE = create_accountability_report.OUTPUT_DIR / f"{__import__('datetime').date.today().strftime('%Y-%m-%d')} {study} CZ IWRS overview.xlsx"
create_accountability_report.main()
print("\nVse hotovo!")
main()