import os import sys from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout load_dotenv(Path(__file__).parent / ".env") USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka") PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "") DOWNLOAD_DIR = Path(__file__).parent / "downloads" AUTH_FILE = Path(__file__).parent / "auth.json" AUTH_MAX_AGE_DAYS = 7 LOGIN_URL = "https://login.imedidata.com/login" SELECT_ROLE_URL = ( "https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx" "?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d" "&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4" "&studygroup_id=107981" ) STUDY_NAME = "42847922MDD3003" SITE_GROUP = "CZE" FORM_NAME = "Date of Visit" REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def auth_valid(): if not AUTH_FILE.exists(): return False age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime) return age < timedelta(days=AUTH_MAX_AGE_DAYS) def wait_load(page, extra_ms=1000): """Čeká na 'load' event + extra pauza. Rave nikdy nedosáhne networkidle.""" try: page.wait_for_load_state("load", timeout=20_000) except PWTimeout: pass page.wait_for_timeout(extra_ms) def dbg(page, label): print(f"[{label}] URL: {page.url}") # --------------------------------------------------------------------------- # Login # --------------------------------------------------------------------------- def do_login(page, context): print("Přihlašuji se do iMedidata...") page.goto(LOGIN_URL) wait_load(page, 500) dbg(page, "login-page") # Pole username a password mají jméno session[username] / session[password] page.wait_for_selector('input[name="session[username]"]', timeout=10_000) page.fill('input[name="session[username]"]', USERNAME) page.fill('input[name="session[password]"]', PASSWORD) page.click('button[type="submit"]') # Čekáme na přesměrování — může jít přes OKTA nebo rovnou na home wait_load(page, 2000) dbg(page, "after-signin") # OKTA MFA? if _okta_mfa_present(page): print("\n*** OKTA MFA vyžadována! ***") otp = input("Zadej OTP kód z OKTA (6 číslic): ").strip() _fill_otp(page, otp) # Čekáme na zpracování OTP a redirect zpět na iMedidata wait_load(page, 3000) dbg(page, "after-otp") # Počkáme až budeme na home.imedidata.com try: page.wait_for_url("**/home.imedidata.com**", timeout=30_000) except PWTimeout: dbg(page, "wait-home-timeout") dbg(page, "final-login") if "home.imedidata.com" not in page.url: print("CHYBA: Přihlášení se nezdařilo! Zkontroluj heslo nebo OKTA kód.") input("Zmáčkni Enter pro ukončení...") sys.exit(1) context.storage_state(path=str(AUTH_FILE)) print("Session uložena do auth.json") def _okta_mfa_present(page): if "okta" in page.url.lower(): return True for sel in [ 'input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[placeholder*="code" i]', ]: if page.query_selector(sel): return True return False def _fill_otp(page, otp): for sel in [ 'input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]', ]: el = page.query_selector(sel) if el: el.fill(otp) page.keyboard.press("Enter") return # Záložní: zkusíme první viditelný text input page.keyboard.type(otp) page.keyboard.press("Enter") # --------------------------------------------------------------------------- # Navigace po přihlášení # --------------------------------------------------------------------------- def go_to_select_role(page): """Přejde na SelectRole stránku a vrátí True pokud jsme tam skutečně.""" print(f"Navigace na SelectRole...") page.goto(SELECT_ROLE_URL) wait_load(page, 1500) dbg(page, "select-role") return "login" not in page.url.lower() and "okta" not in page.url.lower() def select_role(page): """Vybere Site Manager a klikne Continue.""" print("Vybírám roli Site Manager...") # Počkáme na select element try: page.wait_for_selector("select", timeout=10_000) except PWTimeout: dbg(page, "no-select-found") return # Najdeme select s option Site Manager selects = page.query_selector_all("select") found = False for sel_el in selects: opts = sel_el.query_selector_all("option") for opt in opts: txt = (opt.inner_text() or "").strip() if "site manager" in txt.lower(): sel_el.select_option(label=txt) found = True print(f" Vybráno: '{txt}'") break if found: break if not found: print(" VAROVÁNÍ: Option 'Site Manager' nenalezena, zkouším kliknout na text...") try: page.get_by_text("Site Manager", exact=False).first.click() except Exception as e: print(f" {e}") # Klikneme Continue for btn_sel in [ 'input[value="Continue"]', 'input[type="submit"]', 'button:has-text("Continue")', 'button[type="submit"]', ]: try: btn = page.query_selector(btn_sel) if btn: btn.click() break except Exception: continue wait_load(page, 2000) dbg(page, "after-role") def navigate_to_reporter(page): print("Klikám na Reporter...") try: page.wait_for_selector('a:has-text("Reporter")', timeout=15_000) page.click('a:has-text("Reporter")') wait_load(page, 1500) dbg(page, "reporter") except PWTimeout: dbg(page, "reporter-not-found") raise def open_report(page): print(f"Klikám na report ID={REPORT_ID} (Data Listing - Data Stream)...") selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]' try: page.wait_for_selector(selector, timeout=15_000) page.click(selector) wait_load(page, 2000) dbg(page, "report-opened") except PWTimeout: dbg(page, "report-not-found") raise # --------------------------------------------------------------------------- # Parametry reportu # --------------------------------------------------------------------------- def set_study_param(page): """Rozbalí Study panel a vybere 42847922MDD3003.""" print(f" Parametr Study: {STUDY_NAME}") page.click('#PromptsBox_st_ShowHideBtn') page.wait_for_timeout(1500) # Checkbox index 0 = 42847922MDD3003 (ověřeno dříve) page.wait_for_selector('#PromptsBox_st_FrontEndCBList_0', timeout=10_000) cb = page.locator('#PromptsBox_st_FrontEndCBList_0') if not cb.is_checked(): cb.check() wait_load(page, 3000) dbg(page, "after-study") def set_site_group_param(page): """Rozbalí Site Group, vybere CZE a zaškrtne Include Sub Site Groups.""" print(f" Parametr Site Group: {SITE_GROUP}") # Rozbalit Site Group panel page.click('#PromptsBox_sg_ShowHideBtn') page.wait_for_timeout(1500) # Vybrat CZE a spustit change event (jinak postback nepřijde) page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000) page.select_option('#PromptsBox_sg_List', label=SITE_GROUP) page.evaluate("document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))") wait_load(page, 2000) # Include Sub Site Groups print(" Include Sub Site Groups: zapnuto") cb = page.locator('#PromptsBox_sg_CheckBox') if not cb.is_checked(): cb.check() page.evaluate("document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))") wait_load(page, 2000) # Zavřít panel = potvrzení výběru, spustí postback pro Form page.click('#PromptsBox_sg_ShowHideBtn') wait_load(page, 3000) dbg(page, "after-site-group") def set_form_param(page): """Rozbalí Form panel, vyhledá Date of Visit a zaškrtne ho.""" print(f" Parametr Form: {FORM_NAME}") page.click('#PromptsBox_fm2_ShowHideBtn') page.wait_for_timeout(2000) # Vyplnit search a odeslat Enterem — výsledek je okamžitý page.wait_for_selector('#PromptsBox_fm2_SearchTxt', timeout=10_000) page.fill('#PromptsBox_fm2_SearchTxt', FORM_NAME) page.locator('#PromptsBox_fm2_SearchTxt').press('Enter') page.wait_for_timeout(800) # Zaškrtneme první (jediný) výsledek cbs = page.query_selector_all('input[id^="PromptsBox_fm2_FrontEndCBList_"]') if cbs: if not cbs[0].is_checked(): cbs[0].click() print(f" '{FORM_NAME}' zaškrtnuto") wait_load(page, 500) return print(f" VAROVÁNÍ: '{FORM_NAME}' nenalezen!") # --------------------------------------------------------------------------- # Submit a download # --------------------------------------------------------------------------- def submit_and_download(page, context): print("Odesílám report (čekám na nové okno)...") with context.expect_page() as new_page_info: page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click() new_page = new_page_info.value new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000) # Čekáme až se zobrazí Download File — stránka nejdřív ukazuje "Loading" print(" Čekám na vygenerování reportu...") new_page.wait_for_selector( 'input[value="Download File"], button:has-text("Download File")', timeout=300_000 # až 5 minut pro velké reporty ) new_page.wait_for_timeout(500) dbg(new_page, "download-window") # Nastavení parametrů stahování print(" Nastavuji parametry stahování...") # Separator: čárka (default) sep = new_page.query_selector('input[name*="Separator"], input[name*="separator"]') if sep: sep.fill(',') # File type: .csv # Formulář je v iframu — najdeme správný frame target_frame = new_page.main_frame for frame in new_page.frames: if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'): target_frame = frame print(f" Frame nalezen: {frame.url}") break # File type: .csv (application/vnd.ms-excel) for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): val = opt.get_attribute('value') or '' txt = opt.inner_text() or '' if 'vnd.ms-excel' in val or 'vnd.ms-excel' in txt: sel.select_option(value=val) print(" File type: .csv (application/vnd.ms-excel)") break # Export type: attachment for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): if 'attachment' in (opt.get_attribute('value') or '').lower(): sel.select_option(value='attachment') break # Save as Unicode: necháme nezaškrtnuté (default) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") filename = f"{timestamp}_EDC_MDD3003_DataListing.csv" output_path = DOWNLOAD_DIR / filename print("Stahuji CSV...") with new_page.expect_download(timeout=60_000) as dl_info: btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")') if btn: btn.click() else: new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click() download = dl_info.value download.save_as(str(output_path)) print(f"\nHotovo! Soubor uložen: {output_path}") return output_path # --------------------------------------------------------------------------- # Hlavní flow # --------------------------------------------------------------------------- def run(): if not PASSWORD: print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env") sys.exit(1) DOWNLOAD_DIR.mkdir(exist_ok=True) with sync_playwright() as p: browser = p.chromium.launch(headless=False, slow_mo=200) ctx_kwargs = {"accept_downloads": True} use_saved = auth_valid() if use_saved: print("Načítám uloženou session (auth.json)...") ctx_kwargs["storage_state"] = str(AUTH_FILE) context = browser.new_context(**ctx_kwargs) page = context.new_page() # Přejdeme na SelectRole logged_in = go_to_select_role(page) if not logged_in: if use_saved: print("Session expirovala, mažu auth.json a přihlašuji znovu...") AUTH_FILE.unlink(missing_ok=True) do_login(page, context) go_to_select_role(page) # Krok 4: výběr role → přiřadí session ID select_role(page) # Krok 5: Reporter navigate_to_reporter(page) # Krok 6: otevření reportu open_report(page) # Krok 7: nastavení parametrů print("Nastavuji parametry reportu...") set_study_param(page) set_site_group_param(page) set_form_param(page) # Krok 8: odeslání a stažení output = submit_and_download(page, context) input("\nZmáčkni Enter pro zavření prohlížeče...") browser.close() if __name__ == "__main__": run()