Files
janssen/Medidata/create_report_v1.0.py
2026-06-02 17:19:44 +02:00

211 lines
8.1 KiB
Python

"""
create_report_v1.0.py
Verze: 1.0
Datum: 2026-06-01
Popis: Excel EDC DataListing report pro studii UCO3001 z MongoDB (db: edc).
Jeden list per kolekce (DateofVisit / ConcomitantTherapy / TrialDisposition).
Sloupce: SiteNumber, SiteName, Subject, Visit, FolderSeq, RecordPos,
LastModified + dynamické fields.* z MongoDB.
Výstup: reports/YYYY-MM-DD 77242113UCO3001 EDC DataListing v1.0.xlsx
"""
import shutil
from datetime import datetime
from pathlib import Path
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from pymongo import ASCENDING, MongoClient
# ── Konfigurace ───────────────────────────────────────────────────────────────
MONGO_URI = "mongodb://192.168.1.76:27017"
DB_NAME = "edc"
STUDY_FULL = "77242113UCO3001"
VERSION = "1.0"
OUTPUT_DIR = Path(r"U:\Dropbox\!!!Days\Downloads Z230")
TRASH_DIR = Path(__file__).parent / "reports" / "TRASH"
COLLECTIONS = [
"UCO3001.DateofVisit",
"UCO3001.ConcomitantTherapy",
"UCO3001.TrialDispositionCompletion-Discontinuation",
]
# ── Formátování ───────────────────────────────────────────────────────────────
HEADER_FILL = PatternFill("solid", fgColor="1F4E79")
HEADER_FONT = Font(bold=True, color="FFFFFF", name="Calibri", size=10)
DATA_FONT = Font(name="Calibri", size=10)
THIN = Side(style="thin", color="CCCCCC")
BORDER = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
# ── Pevné sloupce ─────────────────────────────────────────────────────────────
FIXED_COLS = [
("SiteNumber", lambda d: d.get("site", {}).get("number", "")),
("SiteName", lambda d: d.get("site", {}).get("name", "")),
("Subject", lambda d: d.get("subject", {}).get("label", "")),
("Visit", lambda d: d.get("form", {}).get("instanceName", "")),
("FolderSeq", lambda d: d.get("form", {}).get("folderSeq", "")),
("RecordPos", lambda d: d.get("form", {}).get("recordPosition", "")),
("LastModified", lambda d: _fmt(d.get("lastModified", ""))),
]
# ── Helpers ───────────────────────────────────────────────────────────────────
def _fmt(value: str) -> str:
"""ISO datetime string → 'DD-MMM-YYYY' nebo 'DD-MMM-YYYY HH:MM'."""
if not value:
return ""
try:
dt = datetime.fromisoformat(value)
if dt.hour == 0 and dt.minute == 0 and dt.second == 0:
return dt.strftime("%d-%b-%Y")
return dt.strftime("%d-%b-%Y %H:%M")
except Exception:
return value
def _fmt_field(value) -> str:
"""Naformátuje hodnotu z fields{} — datum nebo string."""
if isinstance(value, str) and "T" in value and value.endswith(("+00:00", "Z")):
return _fmt(value)
return value if value is not None else ""
COLS_LAST_CT = [
"CMTRT_ATC1", "CMTRT_ATC2", "CMTRT_ATC3", "CMTRT_ATC4",
"CMTRT_RXPREF", "CMTRT_TRADE_NAME",
"CMTRT_ATC1_CODE", "CMTRT_ATC2_CODE", "CMTRT_ATC3_CODE", "CMTRT_ATC4_CODE",
"CMTRT_RXPREF_CODE", "CMTRT_TRADE_NAME_CODE",
]
def _field_keys(docs: list, last: list | None = None) -> list:
"""Vrátí seznam unikátních klíčů z fields{} — klíče v `last` přesunuty na konec."""
seen = set()
keys = []
for doc in docs:
for k in doc.get("fields", {}).keys():
if k not in seen:
seen.add(k)
keys.append(k)
if last:
tail = [k for k in last if k in seen]
keys = [k for k in keys if k not in set(tail)] + tail
return keys
def _sheet_name(collection: str) -> str:
"""UCO3001.SomeName → SomeName (max 31 znaků pro Excel)."""
name = collection.split(".", 1)[-1]
abbreviations = {
"TrialDispositionCompletion-Discontinuation": "TrialDisposition",
}
return abbreviations.get(name, name)[:31]
# ── Zápis listu ───────────────────────────────────────────────────────────────
def write_sheet(ws, docs: list, last_cols: list | None = None) -> None:
fixed_names = [c[0] for c in FIXED_COLS]
field_keys = _field_keys(docs, last=last_cols)
all_headers = fixed_names + field_keys
# záhlaví
for col_i, header in enumerate(all_headers, 1):
cell = ws.cell(row=1, column=col_i, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.border = BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
ws.row_dimensions[1].height = 18
ws.freeze_panes = "A2"
# data
for row_i, doc in enumerate(docs, 2):
fields = doc.get("fields", {})
for col_i, (_, getter) in enumerate(FIXED_COLS, 1):
cell = ws.cell(row=row_i, column=col_i, value=getter(doc))
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(vertical="top")
for col_off, key in enumerate(field_keys):
col_i = len(FIXED_COLS) + col_off + 1
cell = ws.cell(row=row_i, column=col_i, value=_fmt_field(fields.get(key, "")))
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(vertical="top")
# autofilter
if all_headers:
ws.auto_filter.ref = f"A1:{get_column_letter(len(all_headers))}1"
# šířky sloupců
widths = {i: len(h) for i, h in enumerate(all_headers, 1)}
for doc in docs:
fields = doc.get("fields", {})
for col_i, (_, getter) in enumerate(FIXED_COLS, 1):
widths[col_i] = max(widths[col_i], len(str(getter(doc))))
for col_off, key in enumerate(field_keys):
col_i = len(FIXED_COLS) + col_off + 1
widths[col_i] = max(widths[col_i], len(str(fields.get(key, ""))))
for col_i, w in widths.items():
ws.column_dimensions[get_column_letter(col_i)].width = min(w + 2, 55)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[DB_NAME]
wb = Workbook()
wb.remove(wb.active)
for coll_name in COLLECTIONS:
docs = list(db[coll_name].find(
{},
{"_id": 0, "sourceFile": 0, "history": 0},
sort=[
("site.number", ASCENDING),
("subject.label", ASCENDING),
("form.folderSeq", ASCENDING),
("form.recordPosition", ASCENDING),
],
))
ws = wb.create_sheet(title=_sheet_name(coll_name))
last = COLS_LAST_CT if "ConcomitantTherapy" in coll_name else None
write_sheet(ws, docs, last_cols=last)
print(f" {coll_name}: {len(docs)} zaznamu -> list '{ws.title}'")
client.close()
OUTPUT_DIR.mkdir(exist_ok=True)
TRASH_DIR.mkdir(exist_ok=True)
# přesun starých verzí do TRASH
pattern = f"* {STUDY_FULL} EDC DataListing *.xlsx"
for old in OUTPUT_DIR.glob(pattern):
dest = TRASH_DIR / old.name
shutil.move(str(old), str(dest))
print(f" Přesunuto do TRASH: {old.name}")
today = datetime.now().strftime("%Y-%m-%d")
filename = f"{today} {STUDY_FULL} EDC DataListing v{VERSION}.xlsx"
out_path = OUTPUT_DIR / filename
wb.save(str(out_path))
print(f"\nUloženo: {out_path}")
if __name__ == "__main__":
main()