Files
janssen/Covance/create_report.py
T
2026-05-05 14:21:35 +02:00

457 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Covance samples report pro studii 42847922MDD3003.
Čte z MySQL (nejnovější import), generuje Excel s 7 listy:
1. Přehled — agregát per pacient+visit (Received / Not Received / Cancelled)
2. Chybějící — detail Not Received vzorků
3. Kity — kit inventory: centra × typy kitů s expirací
4. eQueries — přehled eQuery dotazů (Open červeně)
5. ZDROJ Vzorky — surová data samples
6. ZDROJ Kity — surová data kit inventory
7. ZDROJ eQuery — surová data eQueries
"""
import os
import datetime
import mysql.connector
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
import db_config
STUDY = "42847922MDD3003"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CREATED_DIR = os.path.join(BASE_DIR, "CreatedReports")
# ── styles ───────────────────────────────────────────────────────────────────
HEADER_FILL = PatternFill("solid", fgColor="1F4E79")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
NORMAL_FONT = Font(name="Arial", size=10)
BOLD_FONT = Font(name="Arial", bold=True, size=10)
RED_FONT = Font(name="Arial", bold=True, size=10, color="C00000")
THIN = Side(style="thin", color="CCCCCC")
BORDER = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
EVEN_FILL = PatternFill("solid", fgColor="EBF3FB")
ODD_FILL = PatternFill("solid", fgColor="FFFFFF")
NOTRCV_FILL = PatternFill("solid", fgColor="FCE4D6")
CANCELLED_FILL = PatternFill("solid", fgColor="F2F2F2")
OPEN_FILL = PatternFill("solid", fgColor="FFC7CE")
CENTER = Alignment(horizontal="center", vertical="center")
LEFT = Alignment(horizontal="left", vertical="center")
def unique_path(stem):
path = os.path.join(CREATED_DIR, f"{stem}.xlsx")
if not os.path.exists(path):
return path
tag = datetime.datetime.now().strftime("%H%M")
return os.path.join(CREATED_DIR, f"{stem} {tag}.xlsx")
# ── data load ────────────────────────────────────────────────────────────────
def load_data():
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
sql = """
SELECT
investigator_no, investigator_name, patient_no,
collection_date, protocol_visit_code,
accession, container_no, container_barcode,
specimen_type, sample_status,
label_line1, label_line2
FROM covance_samples
WHERE import_id = (
SELECT MAX(import_id) FROM iwrs_import
WHERE study = %s AND report_type = 'covance_samples'
)
ORDER BY investigator_no, patient_no, protocol_visit_code, container_no
"""
cursor = conn.cursor()
cursor.execute(sql, (STUDY,))
cols = [d[0] for d in cursor.description]
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows, columns=cols)
def load_kit_data():
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
sql = """
SELECT site_code, investigator_name, kit_type, description,
accession, shipped_date, expiration_date, days_to_expiration
FROM covance_kit_inventory
WHERE import_id = (
SELECT MAX(import_id) FROM iwrs_import
WHERE study = %s AND report_type = 'covance_kit_inventory'
)
ORDER BY site_code, kit_type+0, kit_type, accession
"""
cursor = conn.cursor()
cursor.execute(sql, (STUDY,))
cols = [d[0] for d in cursor.description]
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows, columns=cols)
# ── helpers ──────────────────────────────────────────────────────────────────
def test_name(row):
l1 = str(row["label_line1"]).strip() if pd.notna(row["label_line1"]) else ""
l2 = str(row["label_line2"]).strip() if pd.notna(row["label_line2"]) else ""
return f"{l1} {l2}".strip() if l2 else l1
def write_headers(ws, headers, widths, row=2):
for c, (h, w) in enumerate(zip(headers, widths), 1):
cell = ws.cell(row=row, column=c, value=h)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[row].height = 18
def write_title(ws, text, ncols):
ws.merge_cells(f"A1:{get_column_letter(ncols)}1")
cell = ws["A1"]
cell.value = text
cell.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
cell.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
# ── sheet 1: Přehled ─────────────────────────────────────────────────────────
def write_prehled(wb, df):
ws = wb.create_sheet("Přehled")
ws.sheet_view.showGridLines = False
today = datetime.date.today().strftime("%d-%b-%Y")
write_title(ws, f"Covance Samples — {STUDY} ({today})", 9)
headers = ["Site", "Investigátor", "Pacient", "Visit", "Accession",
"Datum odběru", "Celkem", "Received", "Not Received"]
widths = [9, 22, 14, 12, 13, 14, 8, 10, 13]
write_headers(ws, headers, widths)
agg = (
df.groupby(["investigator_no", "investigator_name",
"patient_no", "protocol_visit_code", "accession", "collection_date"])
.agg(
celkem =("sample_status", "count"),
received =("sample_status", lambda x: (x == "Received").sum()),
not_received=("sample_status", lambda x: (x == "Not Received").sum()),
)
.reset_index()
.sort_values(["investigator_no", "patient_no", "protocol_visit_code"])
.reset_index(drop=True)
)
for r_idx, row in agg.iterrows():
excel_row = r_idx + 3
has_missing = row["not_received"] > 0
fill = NOTRCV_FILL if has_missing else (EVEN_FILL if r_idx % 2 == 0 else ODD_FILL)
col_date = row["collection_date"]
date_str = col_date.strftime("%d-%b-%Y") if hasattr(col_date, "strftime") else str(col_date)
values = [
row["investigator_no"], row["investigator_name"], row["patient_no"],
row["protocol_visit_code"], row["accession"], date_str,
int(row["celkem"]), int(row["received"]), int(row["not_received"]),
]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val)
cell.fill = fill
cell.border = BORDER
cell.alignment = CENTER if c_idx in (1, 4, 5, 6, 7, 8, 9) else LEFT
if c_idx == 9 and has_missing:
cell.font = RED_FONT
else:
cell.font = NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:I{len(agg) + 2}"
# ── sheet 2: Chybějící ────────────────────────────────────────────────────────
def write_chybejici(wb, df):
ws = wb.create_sheet("Chybějící")
ws.sheet_view.showGridLines = False
today = datetime.date.today().strftime("%d-%b-%Y")
write_title(ws, f"Not Received vzorky — {STUDY} ({today})", 8)
headers = ["Site", "Pacient", "Visit", "Datum odběru",
"Accession", "Container", "Typ vzorku", "Test"]
widths = [9, 14, 12, 14, 13, 10, 22, 30]
write_headers(ws, headers, widths)
missing = df[df["sample_status"] == "Not Received"].copy()
missing["test"] = missing.apply(test_name, axis=1)
missing = missing.sort_values(
["investigator_no", "patient_no", "protocol_visit_code", "container_no"]
).reset_index(drop=True)
for r_idx, row in missing.iterrows():
excel_row = r_idx + 3
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
col_date = row["collection_date"]
date_str = col_date.strftime("%d-%b-%Y") if hasattr(col_date, "strftime") else str(col_date)
values = [
row["investigator_no"], row["patient_no"],
row["protocol_visit_code"], date_str,
row["accession"], int(row["container_no"]) if pd.notna(row["container_no"]) else "",
row["specimen_type"], row["test"],
]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val)
cell.fill = fill
cell.border = BORDER
cell.alignment = CENTER if c_idx in (1, 3, 4, 5, 6) else LEFT
cell.font = NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:H{len(missing) + 2}"
# ── sheet 3: Kity (per centrum) ──────────────────────────────────────────────
def kit_sort_key(kt):
try:
return (0, int(kt), "")
except ValueError:
pass
if str(kt).upper().startswith("T-"):
try:
return (1, int(str(kt)[2:]), "")
except ValueError:
pass
return (2, 0, str(kt))
SITE_HDR_FILL = PatternFill("solid", fgColor="2E75B6")
SITE_HDR_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
TOTAL_FILL = PatternFill("solid", fgColor="D6E4F0")
SOON_FILL = PatternFill("solid", fgColor="FCE4D6")
def _cell(ws, row, col, value, font, fill, alignment, border):
c = ws.cell(row=row, column=col, value=value)
c.font = font; c.fill = fill; c.alignment = alignment; c.border = border
return c
def write_kity(wb, df_kits):
ws = wb.create_sheet("Kity")
ws.sheet_view.showGridLines = False
today = datetime.date.today()
cutoff = today + datetime.timedelta(days=30)
today_str = today.strftime("%d-%b-%Y")
# sada kitů napříč celou studií (seřazeno)
kit_types = sorted(df_kits["kit_type"].dropna().unique(), key=kit_sort_key)
kt_desc = (df_kits.drop_duplicates("kit_type")
.set_index("kit_type")["description"].to_dict())
# centra seřazená
sites = (df_kits[["site_code", "investigator_name"]]
.drop_duplicates()
.sort_values("site_code")
.values.tolist())
# sloupce: A=Kit Type, B=Popis, C=≤30 dní, D=>30 dní, E=Celkem
ws.column_dimensions["A"].width = 9
ws.column_dimensions["B"].width = 28
ws.column_dimensions["C"].width = 14
ws.column_dimensions["D"].width = 14
ws.column_dimensions["E"].width = 10
write_title(ws, f"Kit Inventory — {STUDY} ({today_str})", 5)
# sub-header (řádek 2) — bez pevné výšky, Excel si ji sám přizpůsobí
for col, txt in [(1, "Kit Type"), (2, "Popis"),
(3, f"Expiruje ≤30 dní\n({cutoff.strftime('%d-%b-%Y')})"),
(4, "Expiruje >30 dní"),
(5, "Celkem")]:
c = ws.cell(row=2, column=col, value=txt)
c.font = HEADER_FONT; c.fill = HEADER_FILL
c.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
c.border = BORDER
cur_row = 3
for site_code, investigator in sites:
# ── site header ───────────────────────────────────────────────────────
ws.merge_cells(f"A{cur_row}:E{cur_row}")
c = ws.cell(row=cur_row, column=1,
value=f"{site_code}{investigator}")
c.font = SITE_HDR_FONT; c.fill = SITE_HDR_FILL
c.alignment = LEFT; c.border = BORDER
for col in range(2, 6):
ws.cell(row=cur_row, column=col).fill = SITE_HDR_FILL
ws.cell(row=cur_row, column=col).border = BORDER
ws.row_dimensions[cur_row].height = 17
cur_row += 1
# kity tohoto centra
site_df = df_kits[df_kits["site_code"] == site_code].copy()
site_df["exp_date"] = pd.to_datetime(site_df["expiration_date"]).dt.date
site_soon = 0
site_later = 0
for kt_idx, kt in enumerate(kit_types):
kt_df = site_df[site_df["kit_type"] == kt]
soon = int((kt_df["exp_date"].apply(
lambda d: d is not None and today <= d <= cutoff)).sum())
later = int((kt_df["exp_date"].apply(
lambda d: d is not None and d > cutoff)).sum())
site_soon += soon
site_later += later
total = soon + later
fill = EVEN_FILL if kt_idx % 2 == 0 else ODD_FILL
_cell(ws, cur_row, 1, kt, BOLD_FONT, fill, CENTER, BORDER)
_cell(ws, cur_row, 2, kt_desc.get(kt, ""), NORMAL_FONT, fill, LEFT, BORDER)
_cell(ws, cur_row, 3, soon if soon else None,
RED_FONT if soon else NORMAL_FONT,
SOON_FILL if soon else fill, CENTER, BORDER)
_cell(ws, cur_row, 4, later if later else None,
NORMAL_FONT, fill, CENTER, BORDER)
_cell(ws, cur_row, 5, total if total else None,
BOLD_FONT, fill, CENTER, BORDER)
ws.row_dimensions[cur_row].height = 16
cur_row += 1
# ── součet centra ─────────────────────────────────────────────────────
site_total = site_soon + site_later
_cell(ws, cur_row, 1, "Celkem", BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
_cell(ws, cur_row, 2, "", BOLD_FONT, TOTAL_FILL, LEFT, BORDER)
_cell(ws, cur_row, 3, site_soon if site_soon else None,
BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
_cell(ws, cur_row, 4, site_later if site_later else None,
BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
_cell(ws, cur_row, 5, site_total if site_total else None,
BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
ws.row_dimensions[cur_row].height = 16
cur_row += 2 # prázdný řádek mezi centry
ws.freeze_panes = "A3"
# ── sheet 4: ZDROJ (samples) ─────────────────────────────────────────────────
# ── sheet 5: ZDROJ Kity ──────────────────────────────────────────────────────
def write_zdroj_kity(wb, df_kits):
ws = wb.create_sheet("ZDROJ Kity")
ws.sheet_view.showGridLines = True
headers = list(df_kits.columns)
for c, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=c, value=h)
cell.font = Font(name="Arial", bold=True, size=9, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="404040")
cell.alignment = LEFT
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = 20
for r_idx, (_, row) in enumerate(df_kits.iterrows(), 2):
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
for c_idx, col in enumerate(headers, 1):
val = row[col]
if pd.isna(val):
val = ""
elif hasattr(val, "strftime"):
val = val.strftime("%Y-%m-%d")
cell = ws.cell(row=r_idx, column=c_idx, value=val)
cell.font = Font(name="Arial", size=9)
cell.fill = fill
cell.border = BORDER
cell.alignment = LEFT
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
# ── sheet 4: ZDROJ ───────────────────────────────────────────────────────────
def write_zdroj(wb, df):
ws = wb.create_sheet("ZDROJ Vzorky")
ws.sheet_view.showGridLines = True
headers = list(df.columns)
for c, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=c, value=h)
cell.font = Font(name="Arial", bold=True, size=9, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="404040")
cell.alignment = LEFT
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = 18
for r_idx, (_, row) in enumerate(df.iterrows(), 2):
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
for c_idx, col in enumerate(headers, 1):
val = row[col]
if pd.isna(val):
val = ""
elif hasattr(val, "strftime"):
val = val.strftime("%Y-%m-%d")
cell = ws.cell(row=r_idx, column=c_idx, value=val)
cell.font = Font(name="Arial", size=9)
cell.fill = fill
cell.border = BORDER
cell.alignment = LEFT
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
# ── main ─────────────────────────────────────────────────────────────────────
def main():
os.makedirs(CREATED_DIR, exist_ok=True)
print("Načítám data z MySQL...")
df = load_data()
df_kits = load_kit_data()
print(f" Vzorky: {len(df)} řádků, {df['patient_no'].nunique()} pacientů")
print(f" Kity: {len(df_kits)} kitů, {df_kits['site_code'].nunique()} center")
wb = Workbook()
wb.remove(wb.active)
write_prehled(wb, df)
write_chybejici(wb, df)
write_kity(wb, df_kits)
write_zdroj(wb, df)
write_zdroj_kity(wb, df_kits)
today = datetime.date.today().strftime("%Y-%m-%d")
out_path = unique_path(f"{today} {STUDY} Covance Samples")
wb.save(out_path)
print(f"Uloženo: {out_path}")
main()