""" download_uco3001.py NAHRAZENO skriptem download_edc_datalistings.py Původně: stahování Data Listing reportů (ReportID=92) pro studii UCO3001. """ import os import sys from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout import tkinter as tk from tkinter import simpledialog load_dotenv(Path(__file__).parent / ".env") USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka") PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "") DOWNLOAD_DIR = Path(__file__).parent / "downloads" AUTH_FILE = Path(__file__).parent / "auth.json" AUTH_MAX_AGE_DAYS = 7 LOGIN_URL = "https://login.imedidata.com/login" SELECT_ROLE_URL = ( "https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx" "?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d" "&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4" "&studygroup_id=107981" ) STUDY_SEARCH = "77242113UCO3001" # hledáme podle podřetězce v názvu studie REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def auth_valid(): if not AUTH_FILE.exists(): return False age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime) return age < timedelta(days=AUTH_MAX_AGE_DAYS) def wait_load(page, extra_ms=1000): try: page.wait_for_load_state("load", timeout=20_000) except PWTimeout: pass page.wait_for_timeout(extra_ms) def dbg(page, label): print(f"[{label}] URL: {page.url}") # --------------------------------------------------------------------------- # Login # --------------------------------------------------------------------------- def _ask_otp_popup(): root = tk.Tk() root.withdraw() root.lift() root.attributes("-topmost", True) otp = simpledialog.askstring("OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root) root.destroy() return (otp or "").strip() def do_login(page, context): print("Přihlašuji se do iMedidata...") page.goto(LOGIN_URL) wait_load(page, 500) page.wait_for_selector('input[name="session[username]"]', timeout=10_000) page.fill('input[name="session[username]"]', USERNAME) page.fill('input[name="session[password]"]', PASSWORD) page.click('button[type="submit"]') wait_load(page, 2000) dbg(page, "after-signin") if _okta_mfa_present(page): print("\n*** OKTA MFA vyžadována! ***") otp = _ask_otp_popup() if not otp: print("CHYBA: OTP nebylo zadáno.") sys.exit(1) _fill_otp(page, otp) wait_load(page, 3000) try: page.wait_for_url("**/home.imedidata.com**", timeout=30_000) except PWTimeout: dbg(page, "wait-home-timeout") if "home.imedidata.com" not in page.url: print("CHYBA: Přihlášení se nezdařilo!") sys.exit(1) context.storage_state(path=str(AUTH_FILE)) print("Session uložena do auth.json") def _okta_mfa_present(page): if "okta" in page.url.lower(): return True for sel in ['input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[placeholder*="code" i]']: if page.query_selector(sel): return True return False def _fill_otp(page, otp): for sel in ['input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]']: el = page.query_selector(sel) if el: el.fill(otp) page.keyboard.press("Enter") return page.keyboard.type(otp) page.keyboard.press("Enter") # --------------------------------------------------------------------------- # Navigace # --------------------------------------------------------------------------- def go_to_select_role(page): print("Navigace na SelectRole...") try: page.goto(SELECT_ROLE_URL) except Exception: pass wait_load(page, 1500) dbg(page, "select-role") return "login" not in page.url.lower() and "okta" not in page.url.lower() def select_role(page): print("Vybírám roli Site Manager...") try: page.wait_for_selector("select", timeout=10_000) except PWTimeout: return for sel_el in page.query_selector_all("select"): for opt in sel_el.query_selector_all("option"): txt = (opt.inner_text() or "").strip() if "site manager" in txt.lower(): sel_el.select_option(label=txt) print(f" Vybráno: '{txt}'") break for btn_sel in ['input[value="Continue"]', 'input[type="submit"]', 'button:has-text("Continue")', 'button[type="submit"]']: btn = page.query_selector(btn_sel) if btn: btn.click() break wait_load(page, 2000) dbg(page, "after-role") def navigate_to_reporter(page): print("Klikám na Reporter...") page.wait_for_selector('a:has-text("Reporter")', timeout=15_000) page.click('a:has-text("Reporter")') wait_load(page, 1500) dbg(page, "reporter") def open_report(page): print(f"Otevírám report ID={REPORT_ID} (Data Listing - Data Stream)...") selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]' page.wait_for_selector(selector, timeout=15_000) page.click(selector) wait_load(page, 2000) dbg(page, "report-opened") # --------------------------------------------------------------------------- # Parametry reportu # --------------------------------------------------------------------------- def set_study_param(page): """Rozbalí Study panel a vybere studii podle podřetězce STUDY_SEARCH.""" print(f" Parametr Study: hledám '{STUDY_SEARCH}'...") page.click('#PromptsBox_st_ShowHideBtn') page.wait_for_timeout(1500) # Projdeme checkboxy a hledáme label obsahující STUDY_SEARCH page.wait_for_selector('input[id^="PromptsBox_st_FrontEndCBList_"]', timeout=10_000) checkboxes = page.query_selector_all('input[id^="PromptsBox_st_FrontEndCBList_"]') found = False for cb in checkboxes: cb_id = cb.get_attribute("id") # Label je ve stejné nebo sousední — hledáme přes JS innerText rodiče label_text = page.evaluate( """id => { const el = document.getElementById(id); if (!el) return ''; const row = el.closest('tr') || el.closest('td') || el.parentElement; return row ? row.innerText : ''; }""", cb_id ) print(f" [{cb_id}] label: {label_text.strip()[:80]}") if STUDY_SEARCH.upper() in label_text.upper(): if not page.locator(f"#{cb_id}").is_checked(): page.locator(f"#{cb_id}").check() print(f" Nalezeno a zaškrtnuto: '{label_text.strip()}'") found = True break if not found: # Záloha: zkusíme index 0 a varujeme print(f" VAROVÁNÍ: Studie '{STUDY_SEARCH}' nenalezena! Zkouším index 0...") cb0 = page.locator('#PromptsBox_st_FrontEndCBList_0') if not cb0.is_checked(): cb0.check() wait_load(page, 3000) dbg(page, "after-study") def set_site_group_param(page, country: str): """Rozbalí Site Group, vybere zadanou zemi a zaškrtne Include Sub Site Groups.""" print(f" Parametr Site Group: {country}") page.click('#PromptsBox_sg_ShowHideBtn') page.wait_for_timeout(1500) page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000) page.select_option('#PromptsBox_sg_List', label=country) page.evaluate( "document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))" ) wait_load(page, 2000) cb = page.locator('#PromptsBox_sg_CheckBox') if not cb.is_checked(): cb.check() page.evaluate( "document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))" ) wait_load(page, 2000) # Zavřít panel = potvrzení → spustí postback pro Form page.click('#PromptsBox_sg_ShowHideBtn') wait_load(page, 3000) dbg(page, "after-site-group") def set_form_param(page, form_name: str): """Vybere formulář v Form panelu.""" print(f" Parametr Form: {form_name}") is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none' if is_closed: page.click('#PromptsBox_fm2_ShowHideBtn') page.wait_for_timeout(2000) if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible(): page.click('#PromptsBox_fm2_PageModeBtn') page.wait_for_timeout(1000) page.click('#PromptsBox_fm2_PageModeBtn') page.wait_for_timeout(2000) search = page.locator('#PromptsBox_fm2_SearchTxt') search.wait_for(state='visible', timeout=10_000) search.click() search.fill(form_name) page.wait_for_timeout(2000) search.press('Enter') page.wait_for_timeout(2000) cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first try: cb_locator.wait_for(state='visible', timeout=8_000) except PWTimeout: print(f" VAROVÁNÍ: '{form_name}' nenalezen!") return if not cb_locator.is_checked(): cb_locator.click() print(f" '{form_name}' zaškrtnuto") page.wait_for_timeout(2000) # --------------------------------------------------------------------------- # Submit a download # --------------------------------------------------------------------------- def submit_and_download(page, context, form_name: str, country: str | None): print("Odesílám report...") with context.expect_page() as new_page_info: page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click() new_page = new_page_info.value new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000) print(" Čekám na vygenerování reportu (max 5 min)...") new_page.wait_for_selector( 'input[value="Download File"], button:has-text("Download File")', timeout=300_000 ) new_page.wait_for_timeout(500) dbg(new_page, "download-window") # Najdeme správný frame target_frame = new_page.main_frame for frame in new_page.frames: if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'): target_frame = frame break # File type: .csv (application/vnd.ms-excel) for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): val = opt.get_attribute('value') or '' if 'vnd.ms-excel' in val: sel.select_option(value=val) print(" File type: .csv (application/vnd.ms-excel)") break # Export type: attachment for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): if 'attachment' in (opt.get_attribute('value') or '').lower(): sel.select_option(value='attachment') break timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") country_slug = country if country else "ALL" form_slug = form_name.replace(" ", "").replace("/", "-").replace("(", "").replace(")", "") filename = f"{timestamp}_EDC_UCO3001_{country_slug}_{form_slug}_DataListing.csv" output_path = DOWNLOAD_DIR / filename print("Stahuji CSV...") with new_page.expect_download(timeout=60_000) as dl_info: btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")') if btn: btn.click() else: new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click() dl_info.value.save_as(str(output_path)) print(f"\nHotovo! Soubor uložen: {output_path}") try: new_page.close() except Exception: pass return output_path # --------------------------------------------------------------------------- # Hlavní funkce # --------------------------------------------------------------------------- def download_datalisting_reports_3001(form_name: str, country: str | None = None): """ Stáhne Data Listing report pro studii UCO3001. Args: form_name: Název formuláře, např. "Trial Disposition (Completion / Discontinuation)" country: Kód site group, např. "CZE". Pokud None, filtr země se nenastaví (všechny). """ if not PASSWORD: print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env") sys.exit(1) DOWNLOAD_DIR.mkdir(exist_ok=True) with sync_playwright() as p: browser = p.chromium.launch(headless=False, slow_mo=200) ctx_kwargs = {"accept_downloads": True} use_saved = auth_valid() if use_saved: print("Načítám uloženou session (auth.json)...") ctx_kwargs["storage_state"] = str(AUTH_FILE) context = browser.new_context(**ctx_kwargs) page = context.new_page() logged_in = go_to_select_role(page) if not logged_in: if use_saved: print("Session expirovala, přihlašuji znovu...") AUTH_FILE.unlink(missing_ok=True) do_login(page, context) go_to_select_role(page) select_role(page) navigate_to_reporter(page) open_report(page) print("\nNastavuji parametry reportu...") set_study_param(page) if country: set_site_group_param(page, country) else: print(" Parametr Site Group: přeskočen (všechny země)") print(f"\n=== Stahuji formulář: {form_name} ===") set_form_param(page, form_name) output = submit_and_download(page, context, form_name, country) browser.close() print("Prohlížeč zavřen.") return output # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": # Příklady spuštění: # python download_uco3001.py # python download_uco3001.py CZE country_arg = sys.argv[1] if len(sys.argv) > 1 else None download_datalisting_reports_3001( form_name="Trial Disposition (Completion / Discontinuation)", country=country_arg, )