502 lines
16 KiB
Python
502 lines
16 KiB
Python
"""
|
||
download_edc_datalistings.py
|
||
Verze: 2.0
|
||
Datum: 2026-05-27
|
||
|
||
Univerzální stahování EDC Data Listing reportů (ReportID=92) z Medidata Rave.
|
||
|
||
Parametry:
|
||
study – vyhledávací řetězec studie (např. "77242113UCO3001")
|
||
forms – seznam názvů formulářů ke stažení
|
||
country – kód země / site group (např. "CZE"), None = všechny
|
||
|
||
Prohlížeč se otevře jednou, přihlásí se, a stáhne všechny formuláře v jedné session.
|
||
|
||
Použití:
|
||
from download_edc import download_datalisting
|
||
|
||
download_datalisting(
|
||
study="77242113UCO3001",
|
||
forms=["Date of Visit", "Concomitant Therapy"],
|
||
country="CZE",
|
||
)
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import sys
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
||
import tkinter as tk
|
||
from tkinter import simpledialog
|
||
|
||
load_dotenv(Path(__file__).parent / ".env")
|
||
|
||
USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka")
|
||
PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "")
|
||
DOWNLOAD_DIR = Path(__file__).parent / "downloads"
|
||
AUTH_FILE = Path(__file__).parent / "auth.json"
|
||
AUTH_MAX_AGE_DAYS = 7
|
||
|
||
LOGIN_URL = "https://login.imedidata.com/login"
|
||
SELECT_ROLE_URL = (
|
||
"https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx"
|
||
"?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d"
|
||
"&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4"
|
||
"&studygroup_id=107981"
|
||
)
|
||
|
||
REPORT_ID = 92
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def auth_valid():
|
||
if not AUTH_FILE.exists():
|
||
return False
|
||
age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime)
|
||
return age < timedelta(days=AUTH_MAX_AGE_DAYS)
|
||
|
||
|
||
def wait_load(page, extra_ms=1000):
|
||
try:
|
||
page.wait_for_load_state("load", timeout=20_000)
|
||
except PWTimeout:
|
||
pass
|
||
page.wait_for_timeout(extra_ms)
|
||
|
||
|
||
def dbg(page, label):
|
||
print(f"[{label}] URL: {page.url}")
|
||
try:
|
||
from pathlib import Path
|
||
shots = Path(__file__).parent / "debug_shots"
|
||
shots.mkdir(exist_ok=True)
|
||
path = shots / f"{label}.png"
|
||
page.screenshot(path=str(path), full_page=True)
|
||
print(f"[{label}] screenshot: {path}")
|
||
except Exception as e:
|
||
print(f"[{label}] screenshot failed: {e}")
|
||
|
||
|
||
def extract_study_label(study_search: str) -> str:
|
||
match = re.search(r'[A-Z]+\d+$', study_search)
|
||
return match.group(0) if match else study_search
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Login
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _ask_otp_popup():
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
root.lift()
|
||
root.attributes("-topmost", True)
|
||
otp = simpledialog.askstring("OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root)
|
||
root.destroy()
|
||
return (otp or "").strip()
|
||
|
||
|
||
def do_login(page, context):
|
||
print("Přihlašuji se do iMedidata...")
|
||
page.goto(LOGIN_URL)
|
||
wait_load(page, 500)
|
||
page.wait_for_selector('input[name="session[username]"]', timeout=10_000)
|
||
page.fill('input[name="session[username]"]', USERNAME)
|
||
page.fill('input[name="session[password]"]', PASSWORD)
|
||
page.click('button[type="submit"]')
|
||
wait_load(page, 2000)
|
||
dbg(page, "after-signin")
|
||
|
||
if _okta_mfa_present(page):
|
||
print("\n*** OKTA MFA vyžadována! ***")
|
||
otp = _ask_otp_popup()
|
||
if not otp:
|
||
print("CHYBA: OTP nebylo zadáno.")
|
||
sys.exit(1)
|
||
_fill_otp(page, otp)
|
||
wait_load(page, 3000)
|
||
|
||
try:
|
||
page.wait_for_url("**/home.imedidata.com**", timeout=30_000)
|
||
except PWTimeout:
|
||
dbg(page, "wait-home-timeout")
|
||
|
||
if "home.imedidata.com" not in page.url:
|
||
print("CHYBA: Přihlášení se nezdařilo!")
|
||
sys.exit(1)
|
||
|
||
context.storage_state(path=str(AUTH_FILE))
|
||
print("Session uložena do auth.json")
|
||
|
||
|
||
def _okta_mfa_present(page):
|
||
if "okta" in page.url.lower():
|
||
return True
|
||
for sel in ['input[name="answer"]', 'input[name*="otp"]',
|
||
'input[name*="code"]', 'input[placeholder*="code" i]']:
|
||
if page.query_selector(sel):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _fill_otp(page, otp):
|
||
for sel in ['input[name="answer"]', 'input[name*="otp"]',
|
||
'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]']:
|
||
el = page.query_selector(sel)
|
||
if el:
|
||
el.fill(otp)
|
||
page.keyboard.press("Enter")
|
||
return
|
||
page.keyboard.type(otp)
|
||
page.keyboard.press("Enter")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Navigace
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def go_to_select_role(page):
|
||
print("Navigace na SelectRole...")
|
||
try:
|
||
page.goto(SELECT_ROLE_URL)
|
||
except Exception:
|
||
pass
|
||
wait_load(page, 1500)
|
||
dbg(page, "select-role")
|
||
return "login" not in page.url.lower() and "okta" not in page.url.lower()
|
||
|
||
|
||
def select_role(page):
|
||
print("Vybírám roli Site Manager...")
|
||
try:
|
||
page.wait_for_selector("select", timeout=10_000)
|
||
except PWTimeout:
|
||
return
|
||
|
||
for sel_el in page.query_selector_all("select"):
|
||
for opt in sel_el.query_selector_all("option"):
|
||
txt = (opt.inner_text() or "").strip()
|
||
if "site manager" in txt.lower():
|
||
sel_el.select_option(label=txt)
|
||
print(f" Vybráno: '{txt}'")
|
||
break
|
||
|
||
clicked = False
|
||
for btn_sel in ['input[value="Continue"]', 'input[type="submit"]',
|
||
'button:has-text("Continue")', 'button[type="submit"]']:
|
||
try:
|
||
btn = page.query_selector(btn_sel)
|
||
except Exception:
|
||
continue
|
||
if btn:
|
||
try:
|
||
with page.expect_navigation(timeout=15_000):
|
||
btn.click()
|
||
clicked = True
|
||
break
|
||
except PWTimeout:
|
||
print(f" Click on {btn_sel} nezpůsobil navigaci, zkouším další...")
|
||
continue
|
||
|
||
if not clicked:
|
||
print(" Fallback: submituji formulář přes JS...")
|
||
try:
|
||
with page.expect_navigation(timeout=15_000):
|
||
page.evaluate("document.forms[0] && document.forms[0].submit()")
|
||
except PWTimeout:
|
||
print(" JS submit fallback také neprošel.")
|
||
|
||
wait_load(page, 1500)
|
||
dbg(page, "after-role")
|
||
|
||
|
||
def navigate_to_reporter(page):
|
||
print("Klikám na Reporter...")
|
||
page.wait_for_selector('a:has-text("Reporter")', timeout=15_000)
|
||
page.click('a:has-text("Reporter")')
|
||
wait_load(page, 1500)
|
||
dbg(page, "reporter")
|
||
|
||
|
||
def open_report(page):
|
||
print(f"Otevírám report ID={REPORT_ID} (Data Listing - Data Stream)...")
|
||
selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]'
|
||
page.wait_for_selector(selector, timeout=15_000)
|
||
page.click(selector)
|
||
wait_load(page, 2000)
|
||
dbg(page, "report-opened")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Parametry reportu
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def set_study_param(page, study_search: str):
|
||
print(f" Parametr Study: hledám '{study_search}'...")
|
||
|
||
page.click('#PromptsBox_st_ShowHideBtn')
|
||
page.wait_for_timeout(1500)
|
||
|
||
page.wait_for_selector('input[id^="PromptsBox_st_FrontEndCBList_"]', timeout=10_000)
|
||
checkboxes = page.query_selector_all('input[id^="PromptsBox_st_FrontEndCBList_"]')
|
||
|
||
found = False
|
||
for cb in checkboxes:
|
||
cb_id = cb.get_attribute("id")
|
||
label_text = page.evaluate(
|
||
"""id => {
|
||
const el = document.getElementById(id);
|
||
if (!el) return '';
|
||
const row = el.closest('tr') || el.closest('td') || el.parentElement;
|
||
return row ? row.innerText : '';
|
||
}""",
|
||
cb_id
|
||
)
|
||
print(f" [{cb_id}] label: {label_text.strip()[:80]}")
|
||
if study_search.upper() in label_text.upper():
|
||
if not page.locator(f"#{cb_id}").is_checked():
|
||
page.locator(f"#{cb_id}").check()
|
||
print(f" Nalezeno a zaškrtnuto: '{label_text.strip()}'")
|
||
found = True
|
||
break
|
||
|
||
if not found:
|
||
print(f" VAROVÁNÍ: Studie '{study_search}' nenalezena! Zkouším index 0...")
|
||
cb0 = page.locator('#PromptsBox_st_FrontEndCBList_0')
|
||
if not cb0.is_checked():
|
||
cb0.check()
|
||
|
||
wait_load(page, 3000)
|
||
dbg(page, "after-study")
|
||
|
||
|
||
def set_site_group_param(page, country: str):
|
||
print(f" Parametr Site Group: {country}")
|
||
|
||
page.click('#PromptsBox_sg_ShowHideBtn')
|
||
page.wait_for_timeout(1500)
|
||
|
||
page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000)
|
||
page.select_option('#PromptsBox_sg_List', label=country)
|
||
page.evaluate(
|
||
"document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))"
|
||
)
|
||
wait_load(page, 2000)
|
||
|
||
cb = page.locator('#PromptsBox_sg_CheckBox')
|
||
if not cb.is_checked():
|
||
cb.check()
|
||
page.evaluate(
|
||
"document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))"
|
||
)
|
||
wait_load(page, 2000)
|
||
|
||
page.click('#PromptsBox_sg_ShowHideBtn')
|
||
wait_load(page, 3000)
|
||
dbg(page, "after-site-group")
|
||
|
||
|
||
def set_form_param(page, form_name: str):
|
||
print(f" Parametr Form: {form_name}")
|
||
|
||
is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none'
|
||
if is_closed:
|
||
page.click('#PromptsBox_fm2_ShowHideBtn')
|
||
page.wait_for_timeout(2000)
|
||
|
||
if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible():
|
||
page.click('#PromptsBox_fm2_PageModeBtn')
|
||
page.wait_for_timeout(1000)
|
||
page.click('#PromptsBox_fm2_PageModeBtn')
|
||
page.wait_for_timeout(2000)
|
||
|
||
search = page.locator('#PromptsBox_fm2_SearchTxt')
|
||
search.wait_for(state='visible', timeout=10_000)
|
||
search.click()
|
||
search.fill(form_name)
|
||
page.wait_for_timeout(2000)
|
||
search.press('Enter')
|
||
page.wait_for_timeout(2000)
|
||
|
||
cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first
|
||
try:
|
||
cb_locator.wait_for(state='visible', timeout=8_000)
|
||
except PWTimeout:
|
||
print(f" VAROVÁNÍ: '{form_name}' nenalezen!")
|
||
return
|
||
|
||
if not cb_locator.is_checked():
|
||
cb_locator.click()
|
||
print(f" '{form_name}' zaškrtnuto")
|
||
page.wait_for_timeout(2000)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Submit a download
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def submit_and_download(page, context, form_name: str, country: str | None, study_label: str):
|
||
print("Odesílám report...")
|
||
|
||
with context.expect_page() as new_page_info:
|
||
page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click()
|
||
|
||
new_page = new_page_info.value
|
||
new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000)
|
||
|
||
print(" Čekám na vygenerování reportu (max 5 min)...")
|
||
new_page.wait_for_selector(
|
||
'input[value="Download File"], button:has-text("Download File")',
|
||
timeout=300_000
|
||
)
|
||
new_page.wait_for_timeout(500)
|
||
dbg(new_page, "download-window")
|
||
|
||
target_frame = new_page.main_frame
|
||
for frame in new_page.frames:
|
||
if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'):
|
||
target_frame = frame
|
||
break
|
||
|
||
for sel in target_frame.query_selector_all('select'):
|
||
for opt in sel.query_selector_all('option'):
|
||
val = opt.get_attribute('value') or ''
|
||
if 'vnd.ms-excel' in val:
|
||
sel.select_option(value=val)
|
||
print(" File type: .csv (application/vnd.ms-excel)")
|
||
break
|
||
|
||
for sel in target_frame.query_selector_all('select'):
|
||
for opt in sel.query_selector_all('option'):
|
||
if 'attachment' in (opt.get_attribute('value') or '').lower():
|
||
sel.select_option(value='attachment')
|
||
break
|
||
|
||
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
|
||
country_slug = country if country else "ALL"
|
||
form_slug = form_name.replace(" ", "").replace("/", "-").replace("(", "").replace(")", "")
|
||
filename = f"{timestamp}_EDC_{study_label}_{country_slug}_{form_slug}_DataListing.csv"
|
||
output_path = DOWNLOAD_DIR / filename
|
||
|
||
print("Stahuji CSV...")
|
||
with new_page.expect_download(timeout=60_000) as dl_info:
|
||
btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")')
|
||
if btn:
|
||
btn.click()
|
||
else:
|
||
new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click()
|
||
|
||
dl_info.value.save_as(str(output_path))
|
||
print(f" Uloženo: {output_path}")
|
||
|
||
try:
|
||
new_page.close()
|
||
except Exception:
|
||
pass
|
||
|
||
return output_path
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Hlavní funkce
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def download_datalisting(study: str, forms: list[str], country: str | None = None):
|
||
"""
|
||
Stáhne EDC Data Listing reporty pro zadanou studii.
|
||
|
||
Args:
|
||
study: Vyhledávací řetězec studie, např. "77242113UCO3001"
|
||
forms: Seznam názvů formulářů ke stažení
|
||
country: Kód site group, např. "CZE". None = všechny země.
|
||
"""
|
||
if not PASSWORD:
|
||
print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env")
|
||
sys.exit(1)
|
||
|
||
if not forms:
|
||
print("Žádné formuláře ke stažení.")
|
||
return []
|
||
|
||
DOWNLOAD_DIR.mkdir(exist_ok=True)
|
||
study_label = extract_study_label(study)
|
||
results = []
|
||
|
||
with sync_playwright() as p:
|
||
browser = p.chromium.launch(
|
||
headless=False,
|
||
slow_mo=200,
|
||
args=["--start-maximized"],
|
||
)
|
||
ctx_kwargs = {"accept_downloads": True, "no_viewport": True}
|
||
|
||
use_saved = auth_valid()
|
||
if use_saved:
|
||
print("Načítám uloženou session (auth.json)...")
|
||
ctx_kwargs["storage_state"] = str(AUTH_FILE)
|
||
|
||
context = browser.new_context(**ctx_kwargs)
|
||
page = context.new_page()
|
||
|
||
logged_in = go_to_select_role(page)
|
||
|
||
if not logged_in:
|
||
if use_saved:
|
||
print("Session expirovala, přihlašuji znovu...")
|
||
AUTH_FILE.unlink(missing_ok=True)
|
||
do_login(page, context)
|
||
go_to_select_role(page)
|
||
|
||
select_role(page)
|
||
navigate_to_reporter(page)
|
||
open_report(page)
|
||
|
||
prompts_url = page.url
|
||
|
||
print("\nNastavuji parametry reportu...")
|
||
set_study_param(page, study)
|
||
|
||
if country:
|
||
set_site_group_param(page, country)
|
||
else:
|
||
print(" Parametr Site Group: přeskočen (všechny země)")
|
||
|
||
for i, form_name in enumerate(forms):
|
||
print(f"\n=== [{i+1}/{len(forms)}] Stahuji formulář: {form_name} ===")
|
||
|
||
if i > 0:
|
||
print("Navigace zpět na report...")
|
||
page.goto(prompts_url)
|
||
wait_load(page, 2000)
|
||
set_study_param(page, study)
|
||
if country:
|
||
set_site_group_param(page, country)
|
||
|
||
set_form_param(page, form_name)
|
||
output = submit_and_download(page, context, form_name, country, study_label)
|
||
results.append(output)
|
||
|
||
browser.close()
|
||
print(f"\nHotovo! Staženo {len(results)} formulářů. Prohlížeč zavřen.")
|
||
|
||
return results
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
country_arg = sys.argv[1] if len(sys.argv) > 1 else None
|
||
download_datalisting(
|
||
study="77242113UCO3001",
|
||
forms=["Trial Disposition (Completion / Discontinuation)"],
|
||
country=country_arg,
|
||
)
|