281 lines
10 KiB
Python
281 lines
10 KiB
Python
# =============================================================================
|
|
# Název: download_lab_reports_v1.0.py
|
|
# Verze: 1.0
|
|
# Datum: 2026-06-16
|
|
# Popis: Stahuje PDF Lab Reports ze xsp.labcorp.com pro studii 77242113UCO3001
|
|
# (interni cislo 36940), filtrovane na 10 ceskych center (CZ).
|
|
# Princip stejny jako download_test_results: Playwright + perzistentni
|
|
# profil (browser_profile/), jednorazovy login, stahovani pres UI
|
|
# (klik na "English" v sloupci Download -> expect_download).
|
|
#
|
|
# Lab Reports grid je AG Grid s virtualnim renderem (~50 z 298 radku
|
|
# v DOM). Skript proto scrolluje viewport po indexech (row-index),
|
|
# u kazdeho radku precte metadata + klikne na "English".
|
|
#
|
|
# Nazev PDF (mezery, ne podtrzitka):
|
|
# "77242113UCO3001 {yyyy-mm-dd odber} {Site} {Subject} {Visit} {Accession}.pdf"
|
|
# Pri kolizi nazvu se prida " (2)", " (3)", ...
|
|
#
|
|
# Prepinace:
|
|
# --dry-run nestahuje, jen vypise metadata a vysledne nazvy souboru
|
|
# --limit N zpracuje jen prvnich N radku (test pojmenovani)
|
|
# =============================================================================
|
|
from playwright.sync_api import sync_playwright
|
|
from datetime import datetime
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import traceback
|
|
import urllib.parse
|
|
|
|
# --- argumenty --------------------------------------------------------------
|
|
parser = argparse.ArgumentParser(description="Stahovani Lab Reports PDF (XSP) pro 77242113UCO3001.")
|
|
parser.add_argument("--dry-run", action="store_true", help="nestahovat, jen vypsat metadata + nazvy")
|
|
parser.add_argument("--limit", type=int, default=0, help="zpracovat jen prvnich N radku (0 = vse)")
|
|
ARGS = parser.parse_args()
|
|
|
|
|
|
def log(msg):
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)
|
|
|
|
|
|
# --- konfigurace ------------------------------------------------------------
|
|
EMAIL = "vbuzalka@its.jnj.com"
|
|
PASSWORD = "%zT3Wqfc9)cWua5"
|
|
LOGIN_URL = "https://xsp.labcorp.com/"
|
|
STUDY = "36940"
|
|
STUDY_CODE = "77242113UCO3001"
|
|
OUT_DIR = r"U:\PythonProject\Janssen\Covance\LabReports"
|
|
|
|
# 10 center (interni ID center) — z URL "GO TO LINK", co poslal uzivatel.
|
|
SITES = [
|
|
"930539", "930547", "930555", "930556", "930553",
|
|
"930549", "930525", "930536", "930557", "930531",
|
|
]
|
|
|
|
_BASE = os.path.dirname(os.path.abspath(__file__))
|
|
PROFILE_DIR = os.path.join(_BASE, "browser_profile")
|
|
|
|
|
|
def lab_reports_url():
|
|
site_param = json.dumps(SITES, separators=(",", ":")) # ["930539","930547",...]
|
|
return (f"https://xsp.labcorp.com/sponsor/study/{STUDY}/lab-reports"
|
|
f"?site={urllib.parse.quote(site_param)}")
|
|
|
|
|
|
# --- pomocne funkce nazvu souboru -------------------------------------------
|
|
def safe(s: str) -> str:
|
|
"""Odstrani znaky nepovolene v nazvu souboru Windows; zachova mezery."""
|
|
return re.sub(r'[\\/:*?"<>|]', "", s or "").strip()
|
|
|
|
|
|
_MONTHS = {"Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05",
|
|
"Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10",
|
|
"Nov": "11", "Dec": "12"}
|
|
|
|
|
|
def fmt_date(s: str) -> str:
|
|
"""'Jun 10, 2026' i 'Jun 15, 2026 7:49 PM' -> '2026-06-10'.
|
|
Bere jen vedouci datum (mesic den, rok), pripadny cas ignoruje."""
|
|
m = re.match(r"\s*([A-Za-z]{3})\w*\s+(\d{1,2}),\s+(\d{4})", s or "")
|
|
if m and m.group(1)[:3] in _MONTHS:
|
|
return f"{m.group(3)}-{_MONTHS[m.group(1)[:3]]}-{int(m.group(2)):02d}"
|
|
return safe(s)
|
|
|
|
|
|
def build_basename(meta: dict) -> str:
|
|
# Posted (datum vystaveni) odlisi reissue stejneho reportu (stejny accession,
|
|
# ruzne Posted). Pri shode i tak zbyva (2)(3) v unique_path().
|
|
return safe(
|
|
f"{STUDY_CODE} {fmt_date(meta['collected'])} {meta['site']} "
|
|
f"{meta['subject']} {meta['visit']} {meta['accession']} "
|
|
f"posted {fmt_date(meta['posted'])}"
|
|
)
|
|
|
|
|
|
def unique_path(out_dir: str, base: str, ext: str = ".pdf") -> str:
|
|
dest = os.path.join(out_dir, base + ext)
|
|
n = 2
|
|
while os.path.exists(dest):
|
|
dest = os.path.join(out_dir, f"{base} ({n}){ext}")
|
|
n += 1
|
|
return dest
|
|
|
|
|
|
# --- JS helpery (cteni AG Gridu) --------------------------------------------
|
|
JS_GRID_INFO = r"""() => {
|
|
const c = document.querySelector('.ag-body-container');
|
|
const r = document.querySelector('.ag-body-container .ag-row');
|
|
const rh = r ? r.getBoundingClientRect().height : 25;
|
|
const ch = c ? parseFloat(c.style.height || '0') : 0;
|
|
return { rowHeight: rh || 25, total: rh ? Math.round(ch / rh) : 0 };
|
|
}"""
|
|
|
|
JS_READ_ROW = r"""(idx) => {
|
|
const dedup = s => {
|
|
s = (s || '').replace(/\s+/g, ' ').trim();
|
|
const h = s.slice(0, Math.floor(s.length / 2));
|
|
if (s === h + h) return h;
|
|
const m = s.match(/^(.*?)\s+\1$/);
|
|
if (m) return m[1];
|
|
return s;
|
|
};
|
|
const row = document.querySelector('.ag-body-container .ag-row[row-index="' + idx + '"]');
|
|
if (!row) return null;
|
|
const get = id => {
|
|
const c = row.querySelector('[col-id="' + id + '"]');
|
|
return c ? dedup(c.textContent) : '';
|
|
};
|
|
return {
|
|
subject: get('subjectId'),
|
|
accession: get('accessionNumber'),
|
|
visit: get('visit'),
|
|
collected: get('visitCollectionDate'),
|
|
site: get('siteNum'),
|
|
posted: get('postedDateTime'),
|
|
};
|
|
}"""
|
|
|
|
JS_SCROLL_TO = r"""(args) => {
|
|
const [idx, rh] = args;
|
|
const vp = document.querySelector('.ag-body-viewport');
|
|
if (!vp) return;
|
|
vp.scrollTop = Math.max(0, idx * rh - vp.clientHeight / 2);
|
|
}"""
|
|
|
|
|
|
# --- login ------------------------------------------------------------------
|
|
def login(page):
|
|
log("LOGIN: otviram login stranku...")
|
|
page.goto(LOGIN_URL)
|
|
try:
|
|
page.get_by_label("Email").wait_for(state="visible", timeout=12000)
|
|
except Exception:
|
|
log(f"LOGIN: Email pole se neobjevilo -> session aktivni, login preskocen ({page.url})")
|
|
return
|
|
log("LOGIN: zadavam email...")
|
|
page.get_by_label("Email").fill(EMAIL)
|
|
page.get_by_role("button", name="Next").click()
|
|
log("LOGIN: cekam na pole pro heslo...")
|
|
page.get_by_label("Password").wait_for(state="visible", timeout=30000)
|
|
log("LOGIN: zadavam heslo...")
|
|
page.get_by_label("Password").fill(PASSWORD)
|
|
page.get_by_role("button", name="Verify").click()
|
|
log("LOGIN: cekam na presmerovani po prihlaseni...")
|
|
try:
|
|
page.wait_for_url(lambda url: "code=" not in url or "xsp." in url, timeout=60000)
|
|
except Exception:
|
|
log("LOGIN: wait_for_url vyprsel, pokracuji.")
|
|
page.wait_for_timeout(3000)
|
|
log(f"LOGIN: prihlaseni hotovo ({page.url})")
|
|
|
|
|
|
# --- nacteni gridu ----------------------------------------------------------
|
|
def open_grid(page):
|
|
url = lab_reports_url()
|
|
log(f"GRID: navigace na Lab Reports ({len(SITES)} center)...")
|
|
page.goto(url)
|
|
log("GRID: cekam na radky (.ag-row)...")
|
|
page.wait_for_selector(".ag-body-container .ag-row", timeout=120000)
|
|
# stabilizace poctu radku
|
|
prev = -1
|
|
for i in range(20):
|
|
info = page.evaluate(JS_GRID_INFO)
|
|
cnt = info["total"]
|
|
log(f" ...kontrola #{i+1}: total={cnt}, rowHeight={info['rowHeight']}")
|
|
if cnt == prev and cnt > 0:
|
|
break
|
|
prev = cnt
|
|
page.wait_for_timeout(2000)
|
|
info = page.evaluate(JS_GRID_INFO)
|
|
log(f"GRID: nacteno, total={info['total']} radku, rowHeight={info['rowHeight']}px.")
|
|
return info["total"], info["rowHeight"]
|
|
|
|
|
|
# --- stazeni jednoho radku --------------------------------------------------
|
|
def process_row(page, idx, row_height, dry_run):
|
|
page.evaluate(JS_SCROLL_TO, [idx, row_height])
|
|
page.wait_for_selector(f'.ag-body-container .ag-row[row-index="{idx}"]', timeout=15000)
|
|
page.wait_for_timeout(150)
|
|
|
|
meta = page.evaluate(JS_READ_ROW, idx)
|
|
if not meta or not meta.get("subject"):
|
|
raise RuntimeError(f"radek {idx}: nepodarilo se precist metadata")
|
|
|
|
base = build_basename(meta)
|
|
dest = unique_path(OUT_DIR, base)
|
|
|
|
if dry_run:
|
|
log(f" [DRY] #{idx}: {os.path.basename(dest)}")
|
|
return os.path.basename(dest)
|
|
|
|
link = page.locator(
|
|
f'.ag-body-container .ag-row[row-index="{idx}"] a.dl-link',
|
|
has_text="English",
|
|
).first
|
|
with page.expect_download(timeout=60000) as dl:
|
|
link.click()
|
|
dl.value.save_as(dest)
|
|
log(f" #{idx}: -> {os.path.basename(dest)}")
|
|
return os.path.basename(dest)
|
|
|
|
|
|
# --- main -------------------------------------------------------------------
|
|
def main():
|
|
os.makedirs(OUT_DIR, exist_ok=True)
|
|
log(f"START: studie {STUDY_CODE} ({STUDY}), vystup '{OUT_DIR}', "
|
|
f"{'DRY-RUN' if ARGS.dry_run else 'STAHOVANI'}"
|
|
f"{f', limit {ARGS.limit}' if ARGS.limit else ''}.")
|
|
|
|
with sync_playwright() as p:
|
|
context = p.chromium.launch_persistent_context(
|
|
user_data_dir=PROFILE_DIR,
|
|
headless=False,
|
|
args=[
|
|
"--disable-blink-features=AutomationControlled",
|
|
"--start-maximized",
|
|
"--disable-restore-session-state",
|
|
"--disable-session-crashed-bubble",
|
|
],
|
|
no_viewport=True,
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
|
|
accept_downloads=True,
|
|
)
|
|
context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
|
page = context.new_page()
|
|
log("START: prohlizec spusten.")
|
|
|
|
login(page)
|
|
total, row_height = open_grid(page)
|
|
if ARGS.limit:
|
|
total = min(total, ARGS.limit)
|
|
|
|
ok, failed = 0, []
|
|
for idx in range(total):
|
|
log(f">>> Radek {idx+1}/{total}")
|
|
try:
|
|
process_row(page, idx, row_height, ARGS.dry_run)
|
|
ok += 1
|
|
except Exception as e:
|
|
failed.append(idx)
|
|
log(f"CHYBA u radku {idx}: {e!r} — pokracuji dalsim.")
|
|
|
|
log(f"KONEC: hotovo {ok}/{total} radku.")
|
|
if failed:
|
|
log(f"KONEC: SELHALY indexy: {failed}")
|
|
context.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
log(f"FATAL: beh spadl: {e!r}")
|
|
traceback.print_exc()
|
|
finally:
|
|
try:
|
|
input("\n[Enter] pro zavreni tohoto okna...")
|
|
except EOFError:
|
|
pass
|