Files
janssen/Medidata/download_edc_datalistings.py

502 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
download_edc_datalistings.py
Verze: 2.0
Datum: 2026-05-27
Univerzální stahování EDC Data Listing reportů (ReportID=92) z Medidata Rave.
Parametry:
study vyhledávací řetězec studie (např. "77242113UCO3001")
forms seznam názvů formulářů ke stažení
country kód země / site group (např. "CZE"), None = všechny
Prohlížeč se otevře jednou, přihlásí se, a stáhne všechny formuláře v jedné session.
Použití:
from download_edc import download_datalisting
download_datalisting(
study="77242113UCO3001",
forms=["Date of Visit", "Concomitant Therapy"],
country="CZE",
)
"""
import os
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
import tkinter as tk
from tkinter import simpledialog
load_dotenv(Path(__file__).parent / ".env")
USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka")
PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "")
DOWNLOAD_DIR = Path(__file__).parent / "downloads"
AUTH_FILE = Path(__file__).parent / "auth.json"
AUTH_MAX_AGE_DAYS = 7
LOGIN_URL = "https://login.imedidata.com/login"
SELECT_ROLE_URL = (
"https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx"
"?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d"
"&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4"
"&studygroup_id=107981"
)
REPORT_ID = 92
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def auth_valid():
if not AUTH_FILE.exists():
return False
age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime)
return age < timedelta(days=AUTH_MAX_AGE_DAYS)
def wait_load(page, extra_ms=1000):
try:
page.wait_for_load_state("load", timeout=20_000)
except PWTimeout:
pass
page.wait_for_timeout(extra_ms)
def dbg(page, label):
print(f"[{label}] URL: {page.url}")
try:
from pathlib import Path
shots = Path(__file__).parent / "debug_shots"
shots.mkdir(exist_ok=True)
path = shots / f"{label}.png"
page.screenshot(path=str(path), full_page=True)
print(f"[{label}] screenshot: {path}")
except Exception as e:
print(f"[{label}] screenshot failed: {e}")
def extract_study_label(study_search: str) -> str:
match = re.search(r'[A-Z]+\d+$', study_search)
return match.group(0) if match else study_search
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
def _ask_otp_popup():
root = tk.Tk()
root.withdraw()
root.lift()
root.attributes("-topmost", True)
otp = simpledialog.askstring("OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root)
root.destroy()
return (otp or "").strip()
def do_login(page, context):
print("Přihlašuji se do iMedidata...")
page.goto(LOGIN_URL)
wait_load(page, 500)
page.wait_for_selector('input[name="session[username]"]', timeout=10_000)
page.fill('input[name="session[username]"]', USERNAME)
page.fill('input[name="session[password]"]', PASSWORD)
page.click('button[type="submit"]')
wait_load(page, 2000)
dbg(page, "after-signin")
if _okta_mfa_present(page):
print("\n*** OKTA MFA vyžadována! ***")
otp = _ask_otp_popup()
if not otp:
print("CHYBA: OTP nebylo zadáno.")
sys.exit(1)
_fill_otp(page, otp)
wait_load(page, 3000)
try:
page.wait_for_url("**/home.imedidata.com**", timeout=30_000)
except PWTimeout:
dbg(page, "wait-home-timeout")
if "home.imedidata.com" not in page.url:
print("CHYBA: Přihlášení se nezdařilo!")
sys.exit(1)
context.storage_state(path=str(AUTH_FILE))
print("Session uložena do auth.json")
def _okta_mfa_present(page):
if "okta" in page.url.lower():
return True
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[placeholder*="code" i]']:
if page.query_selector(sel):
return True
return False
def _fill_otp(page, otp):
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]']:
el = page.query_selector(sel)
if el:
el.fill(otp)
page.keyboard.press("Enter")
return
page.keyboard.type(otp)
page.keyboard.press("Enter")
# ---------------------------------------------------------------------------
# Navigace
# ---------------------------------------------------------------------------
def go_to_select_role(page):
print("Navigace na SelectRole...")
try:
page.goto(SELECT_ROLE_URL)
except Exception:
pass
wait_load(page, 1500)
dbg(page, "select-role")
return "login" not in page.url.lower() and "okta" not in page.url.lower()
def select_role(page):
print("Vybírám roli Site Manager...")
try:
page.wait_for_selector("select", timeout=10_000)
except PWTimeout:
return
for sel_el in page.query_selector_all("select"):
for opt in sel_el.query_selector_all("option"):
txt = (opt.inner_text() or "").strip()
if "site manager" in txt.lower():
sel_el.select_option(label=txt)
print(f" Vybráno: '{txt}'")
break
clicked = False
for btn_sel in ['input[value="Continue"]', 'input[type="submit"]',
'button:has-text("Continue")', 'button[type="submit"]']:
try:
btn = page.query_selector(btn_sel)
except Exception:
continue
if btn:
try:
with page.expect_navigation(timeout=15_000):
btn.click()
clicked = True
break
except PWTimeout:
print(f" Click on {btn_sel} nezpůsobil navigaci, zkouším další...")
continue
if not clicked:
print(" Fallback: submituji formulář přes JS...")
try:
with page.expect_navigation(timeout=15_000):
page.evaluate("document.forms[0] && document.forms[0].submit()")
except PWTimeout:
print(" JS submit fallback také neprošel.")
wait_load(page, 1500)
dbg(page, "after-role")
def navigate_to_reporter(page):
print("Klikám na Reporter...")
page.wait_for_selector('a:has-text("Reporter")', timeout=15_000)
page.click('a:has-text("Reporter")')
wait_load(page, 1500)
dbg(page, "reporter")
def open_report(page):
print(f"Otevírám report ID={REPORT_ID} (Data Listing - Data Stream)...")
selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]'
page.wait_for_selector(selector, timeout=15_000)
page.click(selector)
wait_load(page, 2000)
dbg(page, "report-opened")
# ---------------------------------------------------------------------------
# Parametry reportu
# ---------------------------------------------------------------------------
def set_study_param(page, study_search: str):
print(f" Parametr Study: hledám '{study_search}'...")
page.click('#PromptsBox_st_ShowHideBtn')
page.wait_for_timeout(1500)
page.wait_for_selector('input[id^="PromptsBox_st_FrontEndCBList_"]', timeout=10_000)
checkboxes = page.query_selector_all('input[id^="PromptsBox_st_FrontEndCBList_"]')
found = False
for cb in checkboxes:
cb_id = cb.get_attribute("id")
label_text = page.evaluate(
"""id => {
const el = document.getElementById(id);
if (!el) return '';
const row = el.closest('tr') || el.closest('td') || el.parentElement;
return row ? row.innerText : '';
}""",
cb_id
)
print(f" [{cb_id}] label: {label_text.strip()[:80]}")
if study_search.upper() in label_text.upper():
if not page.locator(f"#{cb_id}").is_checked():
page.locator(f"#{cb_id}").check()
print(f" Nalezeno a zaškrtnuto: '{label_text.strip()}'")
found = True
break
if not found:
print(f" VAROVÁNÍ: Studie '{study_search}' nenalezena! Zkouším index 0...")
cb0 = page.locator('#PromptsBox_st_FrontEndCBList_0')
if not cb0.is_checked():
cb0.check()
wait_load(page, 3000)
dbg(page, "after-study")
def set_site_group_param(page, country: str):
print(f" Parametr Site Group: {country}")
page.click('#PromptsBox_sg_ShowHideBtn')
page.wait_for_timeout(1500)
page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000)
page.select_option('#PromptsBox_sg_List', label=country)
page.evaluate(
"document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
cb = page.locator('#PromptsBox_sg_CheckBox')
if not cb.is_checked():
cb.check()
page.evaluate(
"document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
page.click('#PromptsBox_sg_ShowHideBtn')
wait_load(page, 3000)
dbg(page, "after-site-group")
def set_form_param(page, form_name: str):
print(f" Parametr Form: {form_name}")
is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none'
if is_closed:
page.click('#PromptsBox_fm2_ShowHideBtn')
page.wait_for_timeout(2000)
if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible():
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(1000)
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(2000)
search = page.locator('#PromptsBox_fm2_SearchTxt')
search.wait_for(state='visible', timeout=10_000)
search.click()
search.fill(form_name)
page.wait_for_timeout(2000)
search.press('Enter')
page.wait_for_timeout(2000)
cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first
try:
cb_locator.wait_for(state='visible', timeout=8_000)
except PWTimeout:
print(f" VAROVÁNÍ: '{form_name}' nenalezen!")
return
if not cb_locator.is_checked():
cb_locator.click()
print(f" '{form_name}' zaškrtnuto")
page.wait_for_timeout(2000)
# ---------------------------------------------------------------------------
# Submit a download
# ---------------------------------------------------------------------------
def submit_and_download(page, context, form_name: str, country: str | None, study_label: str):
print("Odesílám report...")
with context.expect_page() as new_page_info:
page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click()
new_page = new_page_info.value
new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000)
print(" Čekám na vygenerování reportu (max 5 min)...")
new_page.wait_for_selector(
'input[value="Download File"], button:has-text("Download File")',
timeout=300_000
)
new_page.wait_for_timeout(500)
dbg(new_page, "download-window")
target_frame = new_page.main_frame
for frame in new_page.frames:
if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'):
target_frame = frame
break
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
val = opt.get_attribute('value') or ''
if 'vnd.ms-excel' in val:
sel.select_option(value=val)
print(" File type: .csv (application/vnd.ms-excel)")
break
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
if 'attachment' in (opt.get_attribute('value') or '').lower():
sel.select_option(value='attachment')
break
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
country_slug = country if country else "ALL"
form_slug = form_name.replace(" ", "").replace("/", "-").replace("(", "").replace(")", "")
filename = f"{timestamp}_EDC_{study_label}_{country_slug}_{form_slug}_DataListing.csv"
output_path = DOWNLOAD_DIR / filename
print("Stahuji CSV...")
with new_page.expect_download(timeout=60_000) as dl_info:
btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")')
if btn:
btn.click()
else:
new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click()
dl_info.value.save_as(str(output_path))
print(f" Uloženo: {output_path}")
try:
new_page.close()
except Exception:
pass
return output_path
# ---------------------------------------------------------------------------
# Hlavní funkce
# ---------------------------------------------------------------------------
def download_datalisting(study: str, forms: list[str], country: str | None = None):
"""
Stáhne EDC Data Listing reporty pro zadanou studii.
Args:
study: Vyhledávací řetězec studie, např. "77242113UCO3001"
forms: Seznam názvů formulářů ke stažení
country: Kód site group, např. "CZE". None = všechny země.
"""
if not PASSWORD:
print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env")
sys.exit(1)
if not forms:
print("Žádné formuláře ke stažení.")
return []
DOWNLOAD_DIR.mkdir(exist_ok=True)
study_label = extract_study_label(study)
results = []
with sync_playwright() as p:
browser = p.chromium.launch(
headless=False,
slow_mo=200,
args=["--start-maximized"],
)
ctx_kwargs = {"accept_downloads": True, "no_viewport": True}
use_saved = auth_valid()
if use_saved:
print("Načítám uloženou session (auth.json)...")
ctx_kwargs["storage_state"] = str(AUTH_FILE)
context = browser.new_context(**ctx_kwargs)
page = context.new_page()
logged_in = go_to_select_role(page)
if not logged_in:
if use_saved:
print("Session expirovala, přihlašuji znovu...")
AUTH_FILE.unlink(missing_ok=True)
do_login(page, context)
go_to_select_role(page)
select_role(page)
navigate_to_reporter(page)
open_report(page)
prompts_url = page.url
print("\nNastavuji parametry reportu...")
set_study_param(page, study)
if country:
set_site_group_param(page, country)
else:
print(" Parametr Site Group: přeskočen (všechny země)")
for i, form_name in enumerate(forms):
print(f"\n=== [{i+1}/{len(forms)}] Stahuji formulář: {form_name} ===")
if i > 0:
print("Navigace zpět na report...")
page.goto(prompts_url)
wait_load(page, 2000)
set_study_param(page, study)
if country:
set_site_group_param(page, country)
set_form_param(page, form_name)
output = submit_and_download(page, context, form_name, country, study_label)
results.append(output)
browser.close()
print(f"\nHotovo! Staženo {len(results)} formulářů. Prohlížeč zavřen.")
return results
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
country_arg = sys.argv[1] if len(sys.argv) > 1 else None
download_datalisting(
study="77242113UCO3001",
forms=["Trial Disposition (Completion / Discontinuation)"],
country=country_arg,
)