import os import sys from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout import tkinter as tk from tkinter import simpledialog load_dotenv(Path(__file__).parent / ".env") USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka") PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "") DOWNLOAD_DIR = Path(__file__).parent / "downloads" AUTH_FILE = Path(__file__).parent / "auth.json" AUTH_MAX_AGE_DAYS = 7 LOGIN_URL = "https://login.imedidata.com/login" SELECT_ROLE_URL = ( "https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx" "?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d" "&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4" "&studygroup_id=107981" ) STUDY_NAME = "42847922MDD3003" SITE_GROUP = "CZE" FORM_NAMES = [ "Date of Visit", "Vital Signs", "Interim Investigator Signature", ] REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def auth_valid(): if not AUTH_FILE.exists(): return False age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime) return age < timedelta(days=AUTH_MAX_AGE_DAYS) def wait_load(page, extra_ms=1000): """Čeká na 'load' event + extra pauza. Rave nikdy nedosáhne networkidle.""" try: page.wait_for_load_state("load", timeout=20_000) except PWTimeout: pass page.wait_for_timeout(extra_ms) def dbg(page, label): print(f"[{label}] URL: {page.url}") # --------------------------------------------------------------------------- # Login # --------------------------------------------------------------------------- def _ask_otp_popup(): """Zobrazí GUI dialog pro zadání OKTA OTP kódu.""" root = tk.Tk() root.withdraw() root.lift() root.attributes("-topmost", True) otp = simpledialog.askstring( "OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root, ) root.destroy() return (otp or "").strip() def do_login(page, context): print("Přihlašuji se do iMedidata...") page.goto(LOGIN_URL) wait_load(page, 500) dbg(page, "login-page") # Pole username a password mají jméno session[username] / session[password] page.wait_for_selector('input[name="session[username]"]', timeout=10_000) page.fill('input[name="session[username]"]', USERNAME) page.fill('input[name="session[password]"]', PASSWORD) page.click('button[type="submit"]') # Čekáme na přesměrování — může jít přes OKTA nebo rovnou na home wait_load(page, 2000) dbg(page, "after-signin") # OKTA MFA? if _okta_mfa_present(page): print("\n*** OKTA MFA vyžadována! ***") otp = _ask_otp_popup() if not otp: print("CHYBA: OTP nebylo zadáno.") sys.exit(1) _fill_otp(page, otp) # Čekáme na zpracování OTP a redirect zpět na iMedidata wait_load(page, 3000) dbg(page, "after-otp") # Počkáme až budeme na home.imedidata.com try: page.wait_for_url("**/home.imedidata.com**", timeout=30_000) except PWTimeout: dbg(page, "wait-home-timeout") dbg(page, "final-login") if "home.imedidata.com" not in page.url: print("CHYBA: Přihlášení se nezdařilo! Zkontroluj heslo nebo OKTA kód.") input("Zmáčkni Enter pro ukončení...") sys.exit(1) context.storage_state(path=str(AUTH_FILE)) print("Session uložena do auth.json") def _okta_mfa_present(page): if "okta" in page.url.lower(): return True for sel in [ 'input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[placeholder*="code" i]', ]: if page.query_selector(sel): return True return False def _fill_otp(page, otp): for sel in [ 'input[name="answer"]', 'input[name*="otp"]', 'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]', ]: el = page.query_selector(sel) if el: el.fill(otp) page.keyboard.press("Enter") return # Záložní: zkusíme první viditelný text input page.keyboard.type(otp) page.keyboard.press("Enter") # --------------------------------------------------------------------------- # Navigace po přihlášení # --------------------------------------------------------------------------- def go_to_select_role(page): """Přejde na SelectRole stránku a vrátí True pokud jsme tam skutečně.""" print(f"Navigace na SelectRole...") try: page.goto(SELECT_ROLE_URL) except Exception: # Rave dělá server-side redirect (ERR_ABORTED) — zkontrolujeme URL až po načtení pass wait_load(page, 1500) dbg(page, "select-role") return "login" not in page.url.lower() and "okta" not in page.url.lower() def select_role(page): """Vybere Site Manager a klikne Continue.""" print("Vybírám roli Site Manager...") # Počkáme na select element try: page.wait_for_selector("select", timeout=10_000) except PWTimeout: dbg(page, "no-select-found") return # Najdeme select s option Site Manager selects = page.query_selector_all("select") found = False for sel_el in selects: opts = sel_el.query_selector_all("option") for opt in opts: txt = (opt.inner_text() or "").strip() if "site manager" in txt.lower(): sel_el.select_option(label=txt) found = True print(f" Vybráno: '{txt}'") break if found: break if not found: print(" VAROVÁNÍ: Option 'Site Manager' nenalezena, zkouším kliknout na text...") try: page.get_by_text("Site Manager", exact=False).first.click() except Exception as e: print(f" {e}") # Klikneme Continue for btn_sel in [ 'input[value="Continue"]', 'input[type="submit"]', 'button:has-text("Continue")', 'button[type="submit"]', ]: try: btn = page.query_selector(btn_sel) if btn: btn.click() break except Exception: continue wait_load(page, 2000) dbg(page, "after-role") def navigate_to_reporter(page): print("Klikám na Reporter...") try: page.wait_for_selector('a:has-text("Reporter")', timeout=15_000) page.click('a:has-text("Reporter")') wait_load(page, 1500) dbg(page, "reporter") except PWTimeout: dbg(page, "reporter-not-found") raise def open_report(page): print(f"Klikám na report ID={REPORT_ID} (Data Listing - Data Stream)...") selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]' try: page.wait_for_selector(selector, timeout=15_000) page.click(selector) wait_load(page, 2000) dbg(page, "report-opened") except PWTimeout: dbg(page, "report-not-found") raise # --------------------------------------------------------------------------- # Parametry reportu # --------------------------------------------------------------------------- def set_study_param(page): """Rozbalí Study panel a vybere 42847922MDD3003.""" print(f" Parametr Study: {STUDY_NAME}") page.click('#PromptsBox_st_ShowHideBtn') page.wait_for_timeout(1500) # Checkbox index 0 = 42847922MDD3003 (ověřeno dříve) page.wait_for_selector('#PromptsBox_st_FrontEndCBList_0', timeout=10_000) cb = page.locator('#PromptsBox_st_FrontEndCBList_0') if not cb.is_checked(): cb.check() wait_load(page, 3000) dbg(page, "after-study") def set_site_group_param(page): """Rozbalí Site Group, vybere CZE a zaškrtne Include Sub Site Groups.""" print(f" Parametr Site Group: {SITE_GROUP}") # Rozbalit Site Group panel page.click('#PromptsBox_sg_ShowHideBtn') page.wait_for_timeout(1500) # Vybrat CZE a spustit change event (jinak postback nepřijde) page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000) page.select_option('#PromptsBox_sg_List', label=SITE_GROUP) page.evaluate("document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))") wait_load(page, 2000) # Include Sub Site Groups print(" Include Sub Site Groups: zapnuto") cb = page.locator('#PromptsBox_sg_CheckBox') if not cb.is_checked(): cb.check() page.evaluate("document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))") wait_load(page, 2000) # Zavřít panel = potvrzení výběru, spustí postback pro Form page.click('#PromptsBox_sg_ShowHideBtn') wait_load(page, 3000) dbg(page, "after-site-group") def set_form_param(page, form_name): """Rozbalí Form panel (pokud je zavřený) a zaškrtne formulář. Panel je SingleSelection=1, takže nový výběr automaticky odznačí předchozí.""" print(f" Parametr Form: {form_name}") # Otevřít panel jen pokud je zavřený (kontrola přes style.display) is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none' if is_closed: page.click('#PromptsBox_fm2_ShowHideBtn') page.wait_for_timeout(2000) # Po předchozím stažení je panel v "locked" módu. # 1. klik na tužku → vymaže výběr, tlačítko se změní na oko # 2. klik na oko → načte seznam všech formulářů if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible(): page.click('#PromptsBox_fm2_PageModeBtn') # tužka → oko page.wait_for_timeout(1000) page.click('#PromptsBox_fm2_PageModeBtn') # oko → načte formuláře page.wait_for_timeout(2000) # Vyhledat formulář — klik zajistí focus, Enter spustí ajaxSelectionGridSearchBoxOnKeypress search = page.locator('#PromptsBox_fm2_SearchTxt') search.wait_for(state='visible', timeout=10_000) search.click() search.fill(form_name) search.press('Enter') # Počkáme až AJAX přepíše DOM se seznamem výsledků cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first try: cb_locator.wait_for(state='visible', timeout=8_000) except PWTimeout: print(f" VAROVÁNÍ: '{form_name}' nenalezen nebo timeout!") return # SingleSelection=1: klik na nový checkbox automaticky odznačí předchozí # Locator se vyhodnotí čerstvě — žádný stale element handle if not cb_locator.is_checked(): cb_locator.click() print(f" '{form_name}' zaškrtnuto") wait_load(page, 500) # --------------------------------------------------------------------------- # Submit a download # --------------------------------------------------------------------------- def submit_and_download(page, context, form_name): print("Odesílám report (čekám na nové okno)...") with context.expect_page() as new_page_info: page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click() new_page = new_page_info.value new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000) # Čekáme až se zobrazí Download File — stránka nejdřív ukazuje "Loading" print(" Čekám na vygenerování reportu...") new_page.wait_for_selector( 'input[value="Download File"], button:has-text("Download File")', timeout=300_000 # až 5 minut pro velké reporty ) new_page.wait_for_timeout(500) dbg(new_page, "download-window") # Nastavení parametrů stahování print(" Nastavuji parametry stahování...") # Separator: čárka (default) sep = new_page.query_selector('input[name*="Separator"], input[name*="separator"]') if sep: sep.fill(',') # File type: .csv # Formulář je v iframu — najdeme správný frame target_frame = new_page.main_frame for frame in new_page.frames: if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'): target_frame = frame print(f" Frame nalezen: {frame.url}") break # File type: .csv (application/vnd.ms-excel) for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): val = opt.get_attribute('value') or '' txt = opt.inner_text() or '' if 'vnd.ms-excel' in val or 'vnd.ms-excel' in txt: sel.select_option(value=val) print(" File type: .csv (application/vnd.ms-excel)") break # Export type: attachment for sel in target_frame.query_selector_all('select'): for opt in sel.query_selector_all('option'): if 'attachment' in (opt.get_attribute('value') or '').lower(): sel.select_option(value='attachment') break # Save as Unicode: necháme nezaškrtnuté (default) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") form_slug = form_name.replace(" ", "") filename = f"{timestamp}_EDC_MDD3003_{form_slug}_DataListing.csv" output_path = DOWNLOAD_DIR / filename print("Stahuji CSV...") with new_page.expect_download(timeout=60_000) as dl_info: btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")') if btn: btn.click() else: new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click() download = dl_info.value download.save_as(str(output_path)) print(f"\nHotovo! Soubor uložen: {output_path}") try: new_page.close() print("Stahovací okno zavřeno.") except Exception: pass return output_path # --------------------------------------------------------------------------- # Hlavní flow # --------------------------------------------------------------------------- def run(): if not PASSWORD: print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env") sys.exit(1) DOWNLOAD_DIR.mkdir(exist_ok=True) with sync_playwright() as p: browser = p.chromium.launch(headless=False, slow_mo=200) ctx_kwargs = {"accept_downloads": True} use_saved = auth_valid() if use_saved: print("Načítám uloženou session (auth.json)...") ctx_kwargs["storage_state"] = str(AUTH_FILE) context = browser.new_context(**ctx_kwargs) page = context.new_page() # Přejdeme na SelectRole logged_in = go_to_select_role(page) if not logged_in: if use_saved: print("Session expirovala, mažu auth.json a přihlašuji znovu...") AUTH_FILE.unlink(missing_ok=True) do_login(page, context) go_to_select_role(page) # Krok 4: výběr role → přiřadí session ID select_role(page) # Krok 5: Reporter navigate_to_reporter(page) # Krok 6: otevření reportu open_report(page) # Krok 7: nastavení parametrů (Study a Site Group jednou, Form v smyčce) print("Nastavuji parametry reportu...") set_study_param(page) set_site_group_param(page) # Krok 8: smyčka přes formuláře for form_name in FORM_NAMES: print(f"\n=== Stahuji formulář: {form_name} ===") set_form_param(page, form_name) submit_and_download(page, context, form_name) browser.close() print("Prohlížeč zavřen.") if __name__ == "__main__": run()