This commit is contained in:
2026-05-05 14:11:50 +02:00
parent 10eba225e7
commit 5f26ff0cc5
17 changed files with 2373 additions and 0 deletions
+447
View File
@@ -0,0 +1,447 @@
"""
Covance samples report pro studii 42847922MDD3003.
Čte z MySQL (nejnovější import), generuje Excel s 5 listy:
1. Přehled — agregát per pacient+visit (Received / Not Received / Cancelled)
2. Chybějící — detail Not Received vzorků
3. Kity — pivot kit inventory: centra × typy kitů
4. ZDROJ — surová data samples
5. ZDROJ Kity — surová data kit inventory
"""
import os
import datetime
import mysql.connector
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
import db_config
STUDY = "42847922MDD3003"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CREATED_DIR = os.path.join(BASE_DIR, "CreatedReports")
# ── styles ───────────────────────────────────────────────────────────────────
HEADER_FILL = PatternFill("solid", fgColor="1F4E79")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
NORMAL_FONT = Font(name="Arial", size=10)
BOLD_FONT = Font(name="Arial", bold=True, size=10)
RED_FONT = Font(name="Arial", bold=True, size=10, color="C00000")
THIN = Side(style="thin", color="CCCCCC")
BORDER = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
EVEN_FILL = PatternFill("solid", fgColor="EBF3FB")
ODD_FILL = PatternFill("solid", fgColor="FFFFFF")
NOTRCV_FILL = PatternFill("solid", fgColor="FCE4D6")
CANCELLED_FILL = PatternFill("solid", fgColor="F2F2F2")
CENTER = Alignment(horizontal="center", vertical="center")
LEFT = Alignment(horizontal="left", vertical="center")
def unique_path(stem):
path = os.path.join(CREATED_DIR, f"{stem}.xlsx")
if not os.path.exists(path):
return path
tag = datetime.datetime.now().strftime("%H%M")
return os.path.join(CREATED_DIR, f"{stem} {tag}.xlsx")
# ── data load ────────────────────────────────────────────────────────────────
def load_data():
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
sql = """
SELECT
investigator_no, investigator_name, patient_no,
collection_date, protocol_visit_code,
accession, container_no, container_barcode,
specimen_type, sample_status,
label_line1, label_line2
FROM covance_samples
WHERE import_id = (
SELECT MAX(import_id) FROM iwrs_import
WHERE study = %s AND report_type = 'covance_samples'
)
ORDER BY investigator_no, patient_no, protocol_visit_code, container_no
"""
cursor = conn.cursor()
cursor.execute(sql, (STUDY,))
cols = [d[0] for d in cursor.description]
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows, columns=cols)
def load_kit_data():
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
sql = """
SELECT site_code, investigator_name, kit_type, description,
accession, shipped_date, expiration_date, days_to_expiration
FROM covance_kit_inventory
WHERE import_id = (
SELECT MAX(import_id) FROM iwrs_import
WHERE study = %s AND report_type = 'covance_kit_inventory'
)
ORDER BY site_code, kit_type+0, kit_type, accession
"""
cursor = conn.cursor()
cursor.execute(sql, (STUDY,))
cols = [d[0] for d in cursor.description]
rows = cursor.fetchall()
cursor.close()
conn.close()
return pd.DataFrame(rows, columns=cols)
# ── helpers ──────────────────────────────────────────────────────────────────
def test_name(row):
l1 = str(row["label_line1"]).strip() if pd.notna(row["label_line1"]) else ""
l2 = str(row["label_line2"]).strip() if pd.notna(row["label_line2"]) else ""
return f"{l1} {l2}".strip() if l2 else l1
def write_headers(ws, headers, widths, row=2):
for c, (h, w) in enumerate(zip(headers, widths), 1):
cell = ws.cell(row=row, column=c, value=h)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[row].height = 18
def write_title(ws, text, ncols):
ws.merge_cells(f"A1:{get_column_letter(ncols)}1")
cell = ws["A1"]
cell.value = text
cell.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
cell.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
# ── sheet 1: Přehled ─────────────────────────────────────────────────────────
def write_prehled(wb, df):
ws = wb.create_sheet("Přehled")
ws.sheet_view.showGridLines = False
today = datetime.date.today().strftime("%d-%b-%Y")
write_title(ws, f"Covance Samples — {STUDY} ({today})", 8)
headers = ["Site", "Investigátor", "Pacient", "Visit", "Datum odběru",
"Celkem", "Received", "Not Received"]
widths = [9, 22, 14, 12, 14, 8, 10, 13]
write_headers(ws, headers, widths)
agg = (
df.groupby(["investigator_no", "investigator_name",
"patient_no", "protocol_visit_code", "collection_date"])
.agg(
celkem =("sample_status", "count"),
received =("sample_status", lambda x: (x == "Received").sum()),
not_received=("sample_status", lambda x: (x == "Not Received").sum()),
)
.reset_index()
.sort_values(["investigator_no", "patient_no", "protocol_visit_code"])
.reset_index(drop=True)
)
for r_idx, row in agg.iterrows():
excel_row = r_idx + 3
has_missing = row["not_received"] > 0
fill = NOTRCV_FILL if has_missing else (EVEN_FILL if r_idx % 2 == 0 else ODD_FILL)
col_date = row["collection_date"]
date_str = col_date.strftime("%d-%b-%Y") if hasattr(col_date, "strftime") else str(col_date)
values = [
row["investigator_no"], row["investigator_name"], row["patient_no"],
row["protocol_visit_code"], date_str,
int(row["celkem"]), int(row["received"]), int(row["not_received"]),
]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val)
cell.fill = fill
cell.border = BORDER
cell.alignment = CENTER if c_idx in (1, 4, 5, 6, 7, 8) else LEFT
if c_idx == 8 and has_missing:
cell.font = RED_FONT
else:
cell.font = NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:H{len(agg) + 2}"
# ── sheet 2: Chybějící ────────────────────────────────────────────────────────
def write_chybejici(wb, df):
ws = wb.create_sheet("Chybějící")
ws.sheet_view.showGridLines = False
today = datetime.date.today().strftime("%d-%b-%Y")
write_title(ws, f"Not Received vzorky — {STUDY} ({today})", 8)
headers = ["Site", "Pacient", "Visit", "Datum odběru",
"Accession", "Container", "Typ vzorku", "Test"]
widths = [9, 14, 12, 14, 13, 10, 22, 30]
write_headers(ws, headers, widths)
missing = df[df["sample_status"] == "Not Received"].copy()
missing["test"] = missing.apply(test_name, axis=1)
missing = missing.sort_values(
["investigator_no", "patient_no", "protocol_visit_code", "container_no"]
).reset_index(drop=True)
for r_idx, row in missing.iterrows():
excel_row = r_idx + 3
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
col_date = row["collection_date"]
date_str = col_date.strftime("%d-%b-%Y") if hasattr(col_date, "strftime") else str(col_date)
values = [
row["investigator_no"], row["patient_no"],
row["protocol_visit_code"], date_str,
row["accession"], int(row["container_no"]) if pd.notna(row["container_no"]) else "",
row["specimen_type"], row["test"],
]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val)
cell.fill = fill
cell.border = BORDER
cell.alignment = CENTER if c_idx in (1, 3, 4, 5, 6) else LEFT
cell.font = NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
ws.auto_filter.ref = f"A2:H{len(missing) + 2}"
# ── sheet 3: Kity (per centrum) ──────────────────────────────────────────────
def kit_sort_key(kt):
try:
return (0, int(kt), "")
except ValueError:
pass
if str(kt).upper().startswith("T-"):
try:
return (1, int(str(kt)[2:]), "")
except ValueError:
pass
return (2, 0, str(kt))
SITE_HDR_FILL = PatternFill("solid", fgColor="2E75B6")
SITE_HDR_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
TOTAL_FILL = PatternFill("solid", fgColor="D6E4F0")
SOON_FILL = PatternFill("solid", fgColor="FCE4D6")
def _cell(ws, row, col, value, font, fill, alignment, border):
c = ws.cell(row=row, column=col, value=value)
c.font = font; c.fill = fill; c.alignment = alignment; c.border = border
return c
def write_kity(wb, df_kits):
ws = wb.create_sheet("Kity")
ws.sheet_view.showGridLines = False
today = datetime.date.today()
cutoff = today + datetime.timedelta(days=30)
today_str = today.strftime("%d-%b-%Y")
# sada kitů napříč celou studií (seřazeno)
kit_types = sorted(df_kits["kit_type"].dropna().unique(), key=kit_sort_key)
kt_desc = (df_kits.drop_duplicates("kit_type")
.set_index("kit_type")["description"].to_dict())
# centra seřazená
sites = (df_kits[["site_code", "investigator_name"]]
.drop_duplicates()
.sort_values("site_code")
.values.tolist())
# sloupce: A=Kit Type, B=Popis, C=≤30 dní, D=>30 dní
ws.column_dimensions["A"].width = 9
ws.column_dimensions["B"].width = 28
ws.column_dimensions["C"].width = 14
ws.column_dimensions["D"].width = 14
write_title(ws, f"Kit Inventory — {STUDY} ({today_str})", 4)
# sub-header (řádek 2)
for col, txt in [(1, "Kit Type"), (2, "Popis"),
(3, f"Expiruje ≤30 dní\n({cutoff.strftime('%d-%b-%Y')})"),
(4, "Expiruje >30 dní")]:
c = ws.cell(row=2, column=col, value=txt)
c.font = HEADER_FONT; c.fill = HEADER_FILL
c.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
c.border = BORDER
ws.row_dimensions[2].height = 28
cur_row = 3
for site_code, investigator in sites:
# ── site header ───────────────────────────────────────────────────────
ws.merge_cells(f"A{cur_row}:D{cur_row}")
c = ws.cell(row=cur_row, column=1,
value=f"{site_code}{investigator}")
c.font = SITE_HDR_FONT; c.fill = SITE_HDR_FILL
c.alignment = LEFT; c.border = BORDER
for col in range(2, 5):
ws.cell(row=cur_row, column=col).fill = SITE_HDR_FILL
ws.cell(row=cur_row, column=col).border = BORDER
ws.row_dimensions[cur_row].height = 17
cur_row += 1
# kity tohoto centra
site_df = df_kits[df_kits["site_code"] == site_code].copy()
# přepočítej expiraci od dnešního dne
site_df["exp_date"] = pd.to_datetime(site_df["expiration_date"]).dt.date
site_soon = 0
site_later = 0
for kt_idx, kt in enumerate(kit_types):
kt_df = site_df[site_df["kit_type"] == kt]
soon = int((kt_df["exp_date"].apply(
lambda d: d is not None and today <= d <= cutoff)).sum())
later = int((kt_df["exp_date"].apply(
lambda d: d is not None and d > cutoff)).sum())
site_soon += soon
site_later += later
fill = EVEN_FILL if kt_idx % 2 == 0 else ODD_FILL
_cell(ws, cur_row, 1, kt, BOLD_FONT, fill, CENTER, BORDER)
_cell(ws, cur_row, 2, kt_desc.get(kt, ""), NORMAL_FONT, fill, LEFT, BORDER)
_cell(ws, cur_row, 3, soon if soon else None,
RED_FONT if soon else NORMAL_FONT,
SOON_FILL if soon else fill, CENTER, BORDER)
_cell(ws, cur_row, 4, later if later else None,
NORMAL_FONT, fill, CENTER, BORDER)
ws.row_dimensions[cur_row].height = 16
cur_row += 1
# ── součet centra ─────────────────────────────────────────────────────
_cell(ws, cur_row, 1, "Celkem", BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
_cell(ws, cur_row, 2, "", BOLD_FONT, TOTAL_FILL, LEFT, BORDER)
_cell(ws, cur_row, 3, site_soon if site_soon else None,
BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
_cell(ws, cur_row, 4, site_later if site_later else None,
BOLD_FONT, TOTAL_FILL, CENTER, BORDER)
ws.row_dimensions[cur_row].height = 16
cur_row += 2 # prázdný řádek mezi centry
ws.freeze_panes = "A3"
# ── sheet 4: ZDROJ (samples) ─────────────────────────────────────────────────
# ── sheet 5: ZDROJ Kity ──────────────────────────────────────────────────────
def write_zdroj_kity(wb, df_kits):
ws = wb.create_sheet("ZDROJ Kity")
ws.sheet_view.showGridLines = True
headers = list(df_kits.columns)
for c, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=c, value=h)
cell.font = Font(name="Arial", bold=True, size=9, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="404040")
cell.alignment = LEFT
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = 20
for r_idx, (_, row) in enumerate(df_kits.iterrows(), 2):
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
for c_idx, col in enumerate(headers, 1):
val = row[col]
if pd.isna(val):
val = ""
elif hasattr(val, "strftime"):
val = val.strftime("%Y-%m-%d")
cell = ws.cell(row=r_idx, column=c_idx, value=val)
cell.font = Font(name="Arial", size=9)
cell.fill = fill
cell.border = BORDER
cell.alignment = LEFT
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
# ── sheet 4: ZDROJ ───────────────────────────────────────────────────────────
def write_zdroj(wb, df):
ws = wb.create_sheet("ZDROJ Vzorky")
ws.sheet_view.showGridLines = True
headers = list(df.columns)
for c, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=c, value=h)
cell.font = Font(name="Arial", bold=True, size=9, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="404040")
cell.alignment = LEFT
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = 18
for r_idx, (_, row) in enumerate(df.iterrows(), 2):
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
for c_idx, col in enumerate(headers, 1):
val = row[col]
if pd.isna(val):
val = ""
elif hasattr(val, "strftime"):
val = val.strftime("%Y-%m-%d")
cell = ws.cell(row=r_idx, column=c_idx, value=val)
cell.font = Font(name="Arial", size=9)
cell.fill = fill
cell.border = BORDER
cell.alignment = LEFT
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
# ── main ─────────────────────────────────────────────────────────────────────
def main():
os.makedirs(CREATED_DIR, exist_ok=True)
print("Načítám data z MySQL...")
df = load_data()
df_kits = load_kit_data()
print(f" Vzorky: {len(df)} řádků, {df['patient_no'].nunique()} pacientů")
print(f" Kity: {len(df_kits)} kitů, {df_kits['site_code'].nunique()} center")
wb = Workbook()
wb.remove(wb.active)
write_prehled(wb, df)
write_chybejici(wb, df)
write_kity(wb, df_kits)
write_zdroj(wb, df)
write_zdroj_kity(wb, df_kits)
today = datetime.date.today().strftime("%Y-%m-%d")
out_path = unique_path(f"{today} {STUDY} Covance Samples")
wb.save(out_path)
print(f"Uloženo: {out_path}")
main()