164 lines
6.2 KiB
Python
164 lines
6.2 KiB
Python
import pandas as pd
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
|
from openpyxl.utils import get_column_letter
|
|
from datetime import date
|
|
import os
|
|
|
|
STUDY = "77242113UCO3001"
|
|
SHIPMENTS_FILE = f"xls_shipments_{STUDY}/shipments_report_{STUDY}.xlsx"
|
|
DETAILS_DIR = f"xls_shipment_details_{STUDY}"
|
|
OUTPUT_DIR = "output"
|
|
TEST_SHIPMENT = None # None = vsechny shipments
|
|
|
|
DROP_COLS = {
|
|
"Location", "Shipped Date", "Delivered Date [UTC]",
|
|
"Delivery Recipient", "Delivery Details", "Cancelled Date",
|
|
"Tracking #", "Total Medication IDs",
|
|
"Shipping Category", "Study", "Destination Location", "Destination Site",
|
|
"Medication type", "Container ID", "Quantity of Medication IDs",
|
|
"Packaged Lot description",
|
|
}
|
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
|
|
def read_shipments():
|
|
df = pd.read_excel(SHIPMENTS_FILE, sheet_name=0, header=5)
|
|
df.columns = df.columns.str.strip()
|
|
df = df.dropna(how="all")
|
|
df["Shipment ID"] = df["Shipment ID"].astype(str).str.strip()
|
|
df = df.drop(columns=[c for c in DROP_COLS if c in df.columns])
|
|
return df
|
|
|
|
|
|
def read_details(shipment_id):
|
|
path = os.path.join(DETAILS_DIR, f"shipment_details_{shipment_id}.xlsx")
|
|
if not os.path.exists(path):
|
|
return None
|
|
df = pd.read_excel(path, sheet_name=0, header=5)
|
|
df.columns = df.columns.str.strip()
|
|
df = df.dropna(how="all")
|
|
df["Shipment"] = df["Shipment"].astype(str).str.strip()
|
|
return df
|
|
|
|
|
|
def build_report():
|
|
shipments = read_shipments()
|
|
if TEST_SHIPMENT:
|
|
shipments = shipments[shipments["Shipment ID"] == TEST_SHIPMENT]
|
|
|
|
shipment_cols = list(shipments.columns)
|
|
all_rows = []
|
|
|
|
for _, s_row in shipments.iterrows():
|
|
sid = s_row["Shipment ID"]
|
|
details = read_details(sid)
|
|
if details is None:
|
|
continue
|
|
extra_cols = [c for c in details.columns if c not in shipment_cols and c != "Shipment" and c not in DROP_COLS]
|
|
for _, d_row in details.iterrows():
|
|
row = {**s_row.to_dict(), **{c: d_row[c] for c in extra_cols}}
|
|
all_rows.append(row)
|
|
print(f" [{sid}] {len(details)} kitu")
|
|
|
|
result = pd.DataFrame(all_rows)
|
|
all_cols = shipment_cols + [c for c in extra_cols if c in result.columns]
|
|
result = result[all_cols]
|
|
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Shipments"
|
|
|
|
HEADER_FILL_SHIP = PatternFill("solid", fgColor="1F4E79")
|
|
HEADER_FILL_DETAIL = PatternFill("solid", fgColor="375623")
|
|
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
|
|
DATA_FONT = Font(name="Arial", size=10)
|
|
BORDER = Border(
|
|
left=Side(style="thin", color="BFBFBF"),
|
|
right=Side(style="thin", color="BFBFBF"),
|
|
bottom=Side(style="thin", color="BFBFBF"),
|
|
)
|
|
|
|
n_ship = len(shipment_cols)
|
|
for ci, col in enumerate(all_cols, 1):
|
|
cell = ws.cell(row=1, column=ci, value=col)
|
|
cell.font = HEADER_FONT
|
|
cell.fill = HEADER_FILL_SHIP if ci <= n_ship else HEADER_FILL_DETAIL
|
|
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
|
|
cell.border = BORDER
|
|
ws.row_dimensions[1].height = 30
|
|
|
|
for ri, (_, row) in enumerate(result.iterrows(), 2):
|
|
for ci, col in enumerate(all_cols, 1):
|
|
val = row[col]
|
|
if pd.isna(val):
|
|
val = None
|
|
elif hasattr(val, "date"):
|
|
val = val.date()
|
|
cell = ws.cell(row=ri, column=ci, value=val)
|
|
cell.font = DATA_FONT
|
|
cell.border = BORDER
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
if isinstance(val, date):
|
|
cell.number_format = "DD-MMM-YYYY"
|
|
|
|
ws.auto_filter.ref = ws.dimensions
|
|
ws.freeze_panes = "A2"
|
|
|
|
for ci, col in enumerate(all_cols, 1):
|
|
vals = [col] + [str(result.iloc[r][col]) for r in range(len(result)) if pd.notna(result.iloc[r][col])]
|
|
ws.column_dimensions[get_column_letter(ci)].width = min(max((len(v) for v in vals), default=10) + 2, 35)
|
|
|
|
# --- Sheet 2: Site Summary ---
|
|
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
|
|
pivot = result.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
|
|
for s in STATUS_COLS:
|
|
if s not in pivot.columns:
|
|
pivot[s] = 0
|
|
pivot = pivot[STATUS_COLS].reset_index().rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
|
|
pivot = pivot.sort_values("Site").reset_index(drop=True)
|
|
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
|
|
|
|
ws2 = wb.create_sheet("Site Summary")
|
|
summary_cols = ["Site", "Available", "Assigned", "Dispensed", "Returned", "Total"]
|
|
HEADER_FILL_SUMM = PatternFill("solid", fgColor="1F4E79")
|
|
|
|
for ci, col in enumerate(summary_cols, 1):
|
|
cell = ws2.cell(row=1, column=ci, value=col)
|
|
cell.font = HEADER_FONT
|
|
cell.fill = HEADER_FILL_SUMM
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
cell.border = BORDER
|
|
ws2.row_dimensions[1].height = 25
|
|
|
|
for ri, (_, row) in enumerate(pivot.iterrows(), 2):
|
|
for ci, col in enumerate(summary_cols, 1):
|
|
cell = ws2.cell(row=ri, column=ci, value=row[col])
|
|
cell.font = DATA_FONT
|
|
cell.border = BORDER
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
|
|
for ci, col in enumerate(summary_cols, 1):
|
|
vals = [col] + [str(pivot.iloc[r][col]) for r in range(len(pivot))]
|
|
ws2.column_dimensions[get_column_letter(ci)].width = min(max(len(v) for v in vals) + 4, 35)
|
|
|
|
ws2.freeze_panes = "A2"
|
|
|
|
suffix = f"_{TEST_SHIPMENT}" if TEST_SHIPMENT else ""
|
|
pattern = f"{STUDY} CZ Shipments{suffix}.xlsx"
|
|
for old in os.listdir(OUTPUT_DIR):
|
|
if old.endswith(pattern):
|
|
try:
|
|
os.remove(os.path.join(OUTPUT_DIR, old))
|
|
print(f"Smazan -> {old}")
|
|
except OSError:
|
|
print(f"Preskakuji smazani (soubor otevren?) -> {old}")
|
|
|
|
outfile = os.path.join(OUTPUT_DIR, f"{date.today()} {STUDY} CZ Shipments{suffix}.xlsx")
|
|
wb.save(outfile)
|
|
print(f"\nUlozeno -> {outfile}")
|
|
|
|
|
|
build_report()
|