import pandas as pd from datetime import date from pathlib import Path from openpyxl import load_workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter # STUDY = "42847922MDD3003" STUDY = "77242113UCO3001" INVENTORY_DIR = Path(f"xls_reports_{STUDY}") DESTRUCTION_DIR = Path(f"xls_ip_destruction_{STUDY}") OUTPUT_DIR = Path("output") OUTPUT_FILE = OUTPUT_DIR / f"{date.today().strftime('%Y-%m-%d')} {STUDY} CZ IWRS overview.xlsx" # ── Shared constants ────────────────────────────────────────────────────────── COLUMN_RENAMES = { "Site": "Site", "Medication ID": "Med ID", "Packaged Lot number": "Lot No.", "Original Expiration Date when Packaged Lot was Added": "Orig Exp Date", "Expiration date": "Exp Date", "Received Date": "Rcv Date", "Shipment Receipt User": "Rcpt User", "Subject Identifier": "Subject ID", "Quantity Assigned": "Qty Asgn", "IRT Transaction": "IRT Tx", "Date Assigned": "Date Asgn", "Assignment User": "Asgn User", "Dispensation Status": "Disp Status", "Dispensing Date": "Disp Date", "Dispensing date": "Disp Date", "Quantity Dispensed": "Qty Disp", "Dispensing User": "Disp User", "Quantity Returned": "Qty Ret", "Date Returned": "Date Ret", "Return User": "Ret User", "DestroyedOn": "Destroyed", "Basket number": "Basket No.", } DATE_COLUMNS = { "Orig Exp Date", "Exp Date", "Rcv Date", "Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date", } COLUMN_WIDTHS = { "Site": 14, "Med ID": 10, "Lot No.": 12, "Orig Exp Date": 16, "Exp Date": 14, "Rcv Date": 14, "Rcpt User": 22, "Subject ID": 14, "Qty Asgn": 9, "IRT Tx": 8, "Date Asgn": 14, "Asgn User": 20, "Disp Status": 16, "Disp Date": 14, "Qty Disp": 9, "Disp User": 20, "Qty Ret": 10, "Date Ret": 14, "Ret User": 18, "Destroyed": 14, "Basket No.": 12, "Max Visit Date": 16, } # ── Helpers ─────────────────────────────────────────────────────────────────── def read_inventory(path): df = pd.read_excel(path, header=None) # Support both "Medication ID" (MDD3003) and "Medication" (UCO3001) mask = df[0].isin(["Medication ID", "Medication"]) meta = {} for i in range(len(df)): val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else "" if val.startswith("Site:"): meta["site"] = val.replace("Site:", "").strip() if not mask.any(): print(f" {path.name}: no data (skipping)") return None, meta header_row = df[mask].index[0] data = pd.read_excel(path, header=header_row) data = data.rename(columns={"Medication": "Medication ID"}) return data, meta def read_destruction_lookup(): lookup = {} for path in DESTRUCTION_DIR.glob("*.xlsx"): df = pd.read_excel(path, header=None) basket_id = None destroyed_on = None for i in range(15): val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else "" if val.startswith("Basket ID:"): basket_id = val.replace("Basket ID:", "").strip() if val.startswith("Drug Destruction Created Date:"): destroyed_on = val.replace("Drug Destruction Created Date:", "").strip() header_row = df[df[0] == "Medication ID Description"].index[0] data = pd.read_excel(path, header=header_row) for med_id in data["Medication ID"].dropna(): lookup[int(med_id)] = (basket_id, destroyed_on) return lookup def format_sheet(ws, header_color, highlight_col=None, highlight_color=None): thin = Side(style="thin", color="000000") border = Border(left=thin, right=thin, top=thin, bottom=thin) header_fill = PatternFill("solid", start_color=header_color) header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10) row_font = Font(name="Arial", size=10) hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None headers = [cell.value for cell in ws[1]] for cell in ws[1]: cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False) cell.border = border for row in ws.iter_rows(min_row=2, max_row=ws.max_row): for cell in row: col_name = headers[cell.column - 1] if cell.column <= len(headers) else None cell.font = row_font cell.border = border cell.alignment = Alignment(horizontal="center") if col_name in DATE_COLUMNS: cell.number_format = "DD-MMM-YYYY" if hi_fill and col_name == highlight_col: cell.fill = hi_fill for cell in ws[1]: width = COLUMN_WIDTHS.get(cell.value, 14) ws.column_dimensions[get_column_letter(cell.column)].width = width ws.auto_filter.ref = ws.dimensions ws.freeze_panes = "A2" # ── Build DataFrames ────────────────────────────────────────────────────────── def build_main(lookup): all_rows = [] for path in sorted(INVENTORY_DIR.glob("onsite_inventory_detail_*.xlsx")): df, meta = read_inventory(path) if df is None: continue df["DestroyedOn"] = df["Medication ID"].apply( lambda x: lookup.get(int(x), (None, None))[1] if pd.notna(x) else None) df["Basket number"] = df["Medication ID"].apply( lambda x: lookup.get(int(x), (None, None))[0] if pd.notna(x) else None) df.insert(0, "Site", meta.get("site", path.stem)) all_rows.append(df) print(f" {path.name}: {len(df)} kits") combined = pd.concat(all_rows, ignore_index=True) combined.rename(columns=COLUMN_RENAMES, inplace=True) for col in DATE_COLUMNS: if col in combined.columns: combined[col] = pd.to_datetime(combined[col], dayfirst=True, errors="coerce") combined.sort_values(["Site", "Rcv Date", "Med ID"], inplace=True, ignore_index=True) return combined def build_expired(df): today = date.today() mask = ( df["Basket No."].isna() & df["Subject ID"].isna() & (df["Exp Date"] < pd.Timestamp(today)) ) filtered = df[mask].copy().reset_index(drop=True) sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}" print(f" Expired: {len(filtered)}") return filtered, sheet_name def build_assigned_not_dispensed(df): mask = df["Subject ID"].notna() & df["Disp Date"].isna() filtered = df[mask].copy().reset_index(drop=True) print(f" Assigned not dispensed: {len(filtered)}") return filtered def build_not_returned(df): no_ret = df[ df["Date Ret"].isna() & df["Subject ID"].notna() & (df["Disp Status"].str.upper() != "NOT DISPENSED") ].copy() max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date") no_ret = no_ret.join(max_asgn, on="Subject ID") filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy() filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."]) filtered = filtered.reset_index(drop=True) print(f" Not returned: {len(filtered)}") return filtered def build_kits_for_destruction(df): mask = ( df["Basket No."].isna() & (df["Date Ret"].notna() | (df["Disp Status"].str.upper() == "NOT DISPENSED")) ) filtered = df[mask].copy().sort_values(["Site", "Date Ret"], ascending=[True, True]) filtered = filtered.drop(columns=["Destroyed", "Basket No."]).reset_index(drop=True) print(f" Kits for destruction: {len(filtered)}") return filtered # ── Main ────────────────────────────────────────────────────────────────────── def main(): # Prepare output dir, remove any previous overview file OUTPUT_DIR.mkdir(exist_ok=True) for old in OUTPUT_DIR.glob(f"*{STUDY} CZ IWRS overview.xlsx"): old.unlink() print(f"Removed old file: {old.name}") lookup = read_destruction_lookup() print(f"Loaded {len(lookup)} kits from destruction reports") df = build_main(lookup) expired_df, expired_sheet = build_expired(df) assigned_df = build_assigned_not_dispensed(df) not_returned_df = build_not_returned(df) destruction_df = build_kits_for_destruction(df) # Write all sheets with pd.ExcelWriter(OUTPUT_FILE, engine="openpyxl") as writer: df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview") expired_df.to_excel( writer, index=False, sheet_name=expired_sheet) assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed") not_returned_df.to_excel(writer, index=False, sheet_name="Not returned") destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction") # Format all sheets wb = load_workbook(OUTPUT_FILE) # Main sheet — dark blue, green highlight for Destroyed/Basket No. ws_main = wb["CountryMedicationOverview"] format_sheet(ws_main, header_color="1F4E79") # Extra: green fill for Destroyed and Basket No. columns new_col_fill = PatternFill("solid", start_color="E2EFDA") headers_main = [c.value for c in ws_main[1]] for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row): for cell in row: col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None if col_name in ("Destroyed", "Basket No."): cell.fill = new_col_fill format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0") format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC") format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA") format_sheet(wb["Kits for destruction"], header_color="595959") wb.save(OUTPUT_FILE) print(f"\nSaved: {OUTPUT_FILE} ({len(df)} rows on main sheet, {wb.sheetnames})") main()