Files
janssen/IWRS/Drugs/Working/create_shipment_report.py
T
2026-05-05 12:19:51 +02:00

206 lines
6.7 KiB
Python

import sys
import os
import mysql.connector
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from datetime import date
import pandas as pd
# db_config.py je v nadřazeném adresáři (Drugs/)
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
import db_config
STUDY = "77242113UCO3001"
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output")
os.makedirs(OUTPUT_DIR, exist_ok=True)
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def load_data(study):
conn = get_conn()
cursor = conn.cursor(dictionary=True)
# nejnovější import_id pro danou studii
cursor.execute(
"SELECT MAX(import_id) AS mid FROM iwrs_import WHERE study=%s AND report_type='drugs'",
(study,),
)
row = cursor.fetchone()
import_id = row["mid"]
if import_id is None:
raise RuntimeError(f"Žádná data v MySQL pro studii {study}")
print(f" import_id = {import_id}")
sql = """
SELECT
s.shipment_id,
s.status AS irt_shipment_status,
s.type,
s.ship_from AS shipment_from,
s.ship_to_site AS ship_to,
s.request_date,
s.received_date,
s.received_by,
s.expected_arrival,
i.investigator,
i.medication_description,
i.medication_id,
i.packaged_lot_no,
i.expiration_date,
i.item_status AS status
FROM iwrs_shipments s
JOIN iwrs_shipment_items i
ON i.study = s.study
AND i.shipment_id = s.shipment_id
AND i.import_id = %s
WHERE s.import_id = %s
AND s.study = %s
ORDER BY s.ship_to_site, s.shipment_id, i.medication_id
"""
cursor.execute(sql, (import_id, import_id, study))
rows = cursor.fetchall()
cursor.close()
conn.close()
print(f" Načteno řádků: {len(rows)}")
return rows
# shipment sloupce (modrý header) / detail sloupce (zelený header)
SHIP_COLS = [
("shipment_id", "Shipment ID"),
("irt_shipment_status","IRT Shipment Status"),
("type", "Type"),
("shipment_from", "Shipment From"),
("ship_to", "Ship To:"),
("request_date", "Request Date"),
("received_date", "Received Date"),
("received_by", "Received by"),
("expected_arrival", "Expected Arrival"),
]
DETAIL_COLS = [
("investigator", "Investigator"),
("medication_description", "Medication Description"),
("medication_id", "Medication ID"),
("packaged_lot_no", "Packaged Lot number"),
("expiration_date", "Expiration Date"),
("status", "Status"),
]
ALL_COLS = SHIP_COLS + DETAIL_COLS
N_SHIP_COLS = len(SHIP_COLS)
HEADER_FILL_SHIP = PatternFill("solid", fgColor="1F4E79")
HEADER_FILL_DETAIL = PatternFill("solid", fgColor="375623")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
DATA_FONT = Font(name="Arial", size=10)
THIN_BORDER = Border(
left=Side(style="thin", color="BFBFBF"),
right=Side(style="thin", color="BFBFBF"),
bottom=Side(style="thin", color="BFBFBF"),
)
def write_shipments_sheet(wb, rows):
ws = wb.active
ws.title = "Shipments"
# záhlaví
for ci, (_, label) in enumerate(ALL_COLS, 1):
cell = ws.cell(row=1, column=ci, value=label)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL_SHIP if ci <= N_SHIP_COLS else HEADER_FILL_DETAIL
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = THIN_BORDER
ws.row_dimensions[1].height = 30
# data
for ri, row in enumerate(rows, 2):
for ci, (key, _) in enumerate(ALL_COLS, 1):
val = row[key]
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = DATA_FONT
cell.border = THIN_BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
if isinstance(val, date):
cell.number_format = "DD-MMM-YYYY"
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# šířky sloupců
for ci, (key, label) in enumerate(ALL_COLS, 1):
vals = [label] + [str(r[key]) for r in rows if r[key] is not None]
ws.column_dimensions[get_column_letter(ci)].width = min(
max((len(v) for v in vals), default=10) + 2, 35
)
def write_summary_sheet(wb, rows):
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
df = pd.DataFrame(rows)
pivot = df.groupby("ship_to")["status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = (
pivot[STATUS_COLS]
.reset_index()
.rename(columns={"ship_to": "Site", "Returned by Subject": "Returned"})
.sort_values("Site")
.reset_index(drop=True)
)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
ws = wb.create_sheet("Site Summary")
s_cols = ["Site", "Available", "Assigned", "Dispensed", "Returned", "Total"]
for ci, col in enumerate(s_cols, 1):
cell = ws.cell(row=1, column=ci, value=col)
cell.font = HEADER_FONT
cell.fill = PatternFill("solid", fgColor="1F4E79")
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = THIN_BORDER
ws.row_dimensions[1].height = 25
for ri, (_, row) in enumerate(pivot.iterrows(), 2):
for ci, col in enumerate(s_cols, 1):
cell = ws.cell(row=ri, column=ci, value=row[col])
cell.font = DATA_FONT
cell.border = THIN_BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
for ci, col in enumerate(s_cols, 1):
vals = [col] + [str(pivot.iloc[r][col]) for r in range(len(pivot))]
ws.column_dimensions[get_column_letter(ci)].width = min(
max(len(v) for v in vals) + 4, 35
)
ws.freeze_panes = "A2"
def build_report():
print(f"\nNačítám data z MySQL pro {STUDY}...")
rows = load_data(STUDY)
wb = openpyxl.Workbook()
write_shipments_sheet(wb, rows)
write_summary_sheet(wb, rows)
outfile = os.path.join(OUTPUT_DIR, f"{date.today()} {STUDY} CZ Shipments.xlsx")
wb.save(outfile)
print(f"\nUloženo -> {outfile}")
build_report()