Initial commit — clean history (removed large test files, browser profiles, Medidata/Clario downloads)

This commit is contained in:
2026-06-01 15:36:31 +02:00
commit bb604e593e
1304 changed files with 116480 additions and 0 deletions
+489
View File
@@ -0,0 +1,489 @@
"""
download_report.py
NAHRAZENO skriptem download_edc_datalistings.py
Původně: stahování Data Listing reportů pro studii MDD3003 (CZE).
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
import tkinter as tk
from tkinter import simpledialog
load_dotenv(Path(__file__).parent / ".env")
USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka")
PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "")
DOWNLOAD_DIR = Path(__file__).parent / "downloads"
AUTH_FILE = Path(__file__).parent / "auth.json"
AUTH_MAX_AGE_DAYS = 7
LOGIN_URL = "https://login.imedidata.com/login"
SELECT_ROLE_URL = (
"https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx"
"?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d"
"&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4"
"&studygroup_id=107981"
)
STUDY_NAME = "42847922MDD3003"
SITE_GROUP = "CZE"
FORM_NAMES = [
"Date of Visit",
"Vital Signs",
"Interim Investigator Signature",
]
REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def auth_valid():
if not AUTH_FILE.exists():
return False
age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime)
return age < timedelta(days=AUTH_MAX_AGE_DAYS)
def wait_load(page, extra_ms=1000):
"""Čeká na 'load' event + extra pauza. Rave nikdy nedosáhne networkidle."""
try:
page.wait_for_load_state("load", timeout=20_000)
except PWTimeout:
pass
page.wait_for_timeout(extra_ms)
def dbg(page, label):
print(f"[{label}] URL: {page.url}")
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
def _ask_otp_popup():
"""Zobrazí GUI dialog pro zadání OKTA OTP kódu."""
root = tk.Tk()
root.withdraw()
root.lift()
root.attributes("-topmost", True)
otp = simpledialog.askstring(
"OKTA MFA",
"Zadej OTP kód z OKTA (6 číslic):",
parent=root,
)
root.destroy()
return (otp or "").strip()
def do_login(page, context):
print("Přihlašuji se do iMedidata...")
page.goto(LOGIN_URL)
wait_load(page, 500)
dbg(page, "login-page")
# Pole username a password mají jméno session[username] / session[password]
page.wait_for_selector('input[name="session[username]"]', timeout=10_000)
page.fill('input[name="session[username]"]', USERNAME)
page.fill('input[name="session[password]"]', PASSWORD)
page.click('button[type="submit"]')
# Čekáme na přesměrování — může jít přes OKTA nebo rovnou na home
wait_load(page, 2000)
dbg(page, "after-signin")
# OKTA MFA?
if _okta_mfa_present(page):
print("\n*** OKTA MFA vyžadována! ***")
otp = _ask_otp_popup()
if not otp:
print("CHYBA: OTP nebylo zadáno.")
sys.exit(1)
_fill_otp(page, otp)
# Čekáme na zpracování OTP a redirect zpět na iMedidata
wait_load(page, 3000)
dbg(page, "after-otp")
# Počkáme až budeme na home.imedidata.com
try:
page.wait_for_url("**/home.imedidata.com**", timeout=30_000)
except PWTimeout:
dbg(page, "wait-home-timeout")
dbg(page, "final-login")
if "home.imedidata.com" not in page.url:
print("CHYBA: Přihlášení se nezdařilo! Zkontroluj heslo nebo OKTA kód.")
input("Zmáčkni Enter pro ukončení...")
sys.exit(1)
context.storage_state(path=str(AUTH_FILE))
print("Session uložena do auth.json")
def _okta_mfa_present(page):
if "okta" in page.url.lower():
return True
for sel in [
'input[name="answer"]',
'input[name*="otp"]',
'input[name*="code"]',
'input[placeholder*="code" i]',
]:
if page.query_selector(sel):
return True
return False
def _fill_otp(page, otp):
for sel in [
'input[name="answer"]',
'input[name*="otp"]',
'input[name*="code"]',
'input[type="tel"]',
'input[placeholder*="code" i]',
]:
el = page.query_selector(sel)
if el:
el.fill(otp)
page.keyboard.press("Enter")
return
# Záložní: zkusíme první viditelný text input
page.keyboard.type(otp)
page.keyboard.press("Enter")
# ---------------------------------------------------------------------------
# Navigace po přihlášení
# ---------------------------------------------------------------------------
def go_to_select_role(page):
"""Přejde na SelectRole stránku a vrátí True pokud jsme tam skutečně."""
print(f"Navigace na SelectRole...")
try:
page.goto(SELECT_ROLE_URL)
except Exception:
# Rave dělá server-side redirect (ERR_ABORTED) — zkontrolujeme URL až po načtení
pass
wait_load(page, 1500)
dbg(page, "select-role")
return "login" not in page.url.lower() and "okta" not in page.url.lower()
def select_role(page):
"""Vybere Site Manager a klikne Continue."""
print("Vybírám roli Site Manager...")
# Počkáme na select element
try:
page.wait_for_selector("select", timeout=10_000)
except PWTimeout:
dbg(page, "no-select-found")
return
# Najdeme select s option Site Manager
selects = page.query_selector_all("select")
found = False
for sel_el in selects:
opts = sel_el.query_selector_all("option")
for opt in opts:
txt = (opt.inner_text() or "").strip()
if "site manager" in txt.lower():
sel_el.select_option(label=txt)
found = True
print(f" Vybráno: '{txt}'")
break
if found:
break
if not found:
print(" VAROVÁNÍ: Option 'Site Manager' nenalezena, zkouším kliknout na text...")
try:
page.get_by_text("Site Manager", exact=False).first.click()
except Exception as e:
print(f" {e}")
# Klikneme Continue
for btn_sel in [
'input[value="Continue"]',
'input[type="submit"]',
'button:has-text("Continue")',
'button[type="submit"]',
]:
try:
btn = page.query_selector(btn_sel)
if btn:
btn.click()
break
except Exception:
continue
wait_load(page, 2000)
dbg(page, "after-role")
def navigate_to_reporter(page):
print("Klikám na Reporter...")
try:
page.wait_for_selector('a:has-text("Reporter")', timeout=15_000)
page.click('a:has-text("Reporter")')
wait_load(page, 1500)
dbg(page, "reporter")
except PWTimeout:
dbg(page, "reporter-not-found")
raise
def open_report(page):
print(f"Klikám na report ID={REPORT_ID} (Data Listing - Data Stream)...")
selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]'
try:
page.wait_for_selector(selector, timeout=15_000)
page.click(selector)
wait_load(page, 2000)
dbg(page, "report-opened")
except PWTimeout:
dbg(page, "report-not-found")
raise
# ---------------------------------------------------------------------------
# Parametry reportu
# ---------------------------------------------------------------------------
def set_study_param(page):
"""Rozbalí Study panel a vybere 42847922MDD3003."""
print(f" Parametr Study: {STUDY_NAME}")
page.click('#PromptsBox_st_ShowHideBtn')
page.wait_for_timeout(1500)
# Checkbox index 0 = 42847922MDD3003 (ověřeno dříve)
page.wait_for_selector('#PromptsBox_st_FrontEndCBList_0', timeout=10_000)
cb = page.locator('#PromptsBox_st_FrontEndCBList_0')
if not cb.is_checked():
cb.check()
wait_load(page, 3000)
dbg(page, "after-study")
def set_site_group_param(page):
"""Rozbalí Site Group, vybere CZE a zaškrtne Include Sub Site Groups."""
print(f" Parametr Site Group: {SITE_GROUP}")
# Rozbalit Site Group panel
page.click('#PromptsBox_sg_ShowHideBtn')
page.wait_for_timeout(1500)
# Vybrat CZE a spustit change event (jinak postback nepřijde)
page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000)
page.select_option('#PromptsBox_sg_List', label=SITE_GROUP)
page.evaluate("document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))")
wait_load(page, 2000)
# Include Sub Site Groups
print(" Include Sub Site Groups: zapnuto")
cb = page.locator('#PromptsBox_sg_CheckBox')
if not cb.is_checked():
cb.check()
page.evaluate("document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))")
wait_load(page, 2000)
# Zavřít panel = potvrzení výběru, spustí postback pro Form
page.click('#PromptsBox_sg_ShowHideBtn')
wait_load(page, 3000)
dbg(page, "after-site-group")
def set_form_param(page, form_name):
"""Rozbalí Form panel (pokud je zavřený) a zaškrtne formulář.
Panel je SingleSelection=1, takže nový výběr automaticky odznačí předchozí."""
print(f" Parametr Form: {form_name}")
# Otevřít panel jen pokud je zavřený (kontrola přes style.display)
is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none'
if is_closed:
page.click('#PromptsBox_fm2_ShowHideBtn')
page.wait_for_timeout(2000)
# Po předchozím stažení je panel v "locked" módu.
# 1. klik na tužku → vymaže výběr, tlačítko se změní na oko
# 2. klik na oko → načte seznam všech formulářů
if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible():
page.click('#PromptsBox_fm2_PageModeBtn') # tužka → oko
page.wait_for_timeout(1000)
page.click('#PromptsBox_fm2_PageModeBtn') # oko → načte formuláře
page.wait_for_timeout(2000)
# Vyhledat formulář — klik zajistí focus, Enter spustí ajaxSelectionGridSearchBoxOnKeypress
search = page.locator('#PromptsBox_fm2_SearchTxt')
search.wait_for(state='visible', timeout=10_000)
search.click()
search.fill(form_name)
search.press('Enter')
# Počkáme až AJAX přepíše DOM se seznamem výsledků
cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first
try:
cb_locator.wait_for(state='visible', timeout=8_000)
except PWTimeout:
print(f" VAROVÁNÍ: '{form_name}' nenalezen nebo timeout!")
return
# SingleSelection=1: klik na nový checkbox automaticky odznačí předchozí
# Locator se vyhodnotí čerstvě — žádný stale element handle
if not cb_locator.is_checked():
cb_locator.click()
print(f" '{form_name}' zaškrtnuto")
wait_load(page, 500)
# ---------------------------------------------------------------------------
# Submit a download
# ---------------------------------------------------------------------------
def submit_and_download(page, context, form_name):
print("Odesílám report (čekám na nové okno)...")
with context.expect_page() as new_page_info:
page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click()
new_page = new_page_info.value
new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000)
# Čekáme až se zobrazí Download File — stránka nejdřív ukazuje "Loading"
print(" Čekám na vygenerování reportu...")
new_page.wait_for_selector(
'input[value="Download File"], button:has-text("Download File")',
timeout=300_000 # až 5 minut pro velké reporty
)
new_page.wait_for_timeout(500)
dbg(new_page, "download-window")
# Nastavení parametrů stahování
print(" Nastavuji parametry stahování...")
# Separator: čárka (default)
sep = new_page.query_selector('input[name*="Separator"], input[name*="separator"]')
if sep:
sep.fill(',')
# File type: .csv
# Formulář je v iframu — najdeme správný frame
target_frame = new_page.main_frame
for frame in new_page.frames:
if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'):
target_frame = frame
print(f" Frame nalezen: {frame.url}")
break
# File type: .csv (application/vnd.ms-excel)
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
val = opt.get_attribute('value') or ''
txt = opt.inner_text() or ''
if 'vnd.ms-excel' in val or 'vnd.ms-excel' in txt:
sel.select_option(value=val)
print(" File type: .csv (application/vnd.ms-excel)")
break
# Export type: attachment
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
if 'attachment' in (opt.get_attribute('value') or '').lower():
sel.select_option(value='attachment')
break
# Save as Unicode: necháme nezaškrtnuté (default)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
form_slug = form_name.replace(" ", "")
filename = f"{timestamp}_EDC_MDD3003_{form_slug}_DataListing.csv"
output_path = DOWNLOAD_DIR / filename
print("Stahuji CSV...")
with new_page.expect_download(timeout=60_000) as dl_info:
btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")')
if btn:
btn.click()
else:
new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click()
download = dl_info.value
download.save_as(str(output_path))
print(f"\nHotovo! Soubor uložen: {output_path}")
try:
new_page.close()
print("Stahovací okno zavřeno.")
except Exception:
pass
return output_path
# ---------------------------------------------------------------------------
# Hlavní flow
# ---------------------------------------------------------------------------
def run():
if not PASSWORD:
print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env")
sys.exit(1)
DOWNLOAD_DIR.mkdir(exist_ok=True)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
ctx_kwargs = {"accept_downloads": True}
use_saved = auth_valid()
if use_saved:
print("Načítám uloženou session (auth.json)...")
ctx_kwargs["storage_state"] = str(AUTH_FILE)
context = browser.new_context(**ctx_kwargs)
page = context.new_page()
# Přejdeme na SelectRole
logged_in = go_to_select_role(page)
if not logged_in:
if use_saved:
print("Session expirovala, mažu auth.json a přihlašuji znovu...")
AUTH_FILE.unlink(missing_ok=True)
do_login(page, context)
go_to_select_role(page)
# Krok 4: výběr role → přiřadí session ID
select_role(page)
# Krok 5: Reporter
navigate_to_reporter(page)
# Krok 6: otevření reportu
open_report(page)
# Krok 7: nastavení parametrů (Study a Site Group jednou, Form v smyčce)
print("Nastavuji parametry reportu...")
set_study_param(page)
set_site_group_param(page)
# Krok 8: smyčka přes formuláře
for form_name in FORM_NAMES:
print(f"\n=== Stahuji formulář: {form_name} ===")
set_form_param(page, form_name)
submit_and_download(page, context, form_name)
browser.close()
print("Prohlížeč zavřen.")
if __name__ == "__main__":
run()
+440
View File
@@ -0,0 +1,440 @@
"""
download_uco3001.py
NAHRAZENO skriptem download_edc_datalistings.py
Původně: stahování Data Listing reportů (ReportID=92) pro studii UCO3001.
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
import tkinter as tk
from tkinter import simpledialog
load_dotenv(Path(__file__).parent / ".env")
USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka")
PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "")
DOWNLOAD_DIR = Path(__file__).parent / "downloads"
AUTH_FILE = Path(__file__).parent / "auth.json"
AUTH_MAX_AGE_DAYS = 7
LOGIN_URL = "https://login.imedidata.com/login"
SELECT_ROLE_URL = (
"https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx"
"?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d"
"&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4"
"&studygroup_id=107981"
)
STUDY_SEARCH = "77242113UCO3001" # hledáme podle podřetězce v názvu studie
REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def auth_valid():
if not AUTH_FILE.exists():
return False
age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime)
return age < timedelta(days=AUTH_MAX_AGE_DAYS)
def wait_load(page, extra_ms=1000):
try:
page.wait_for_load_state("load", timeout=20_000)
except PWTimeout:
pass
page.wait_for_timeout(extra_ms)
def dbg(page, label):
print(f"[{label}] URL: {page.url}")
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
def _ask_otp_popup():
root = tk.Tk()
root.withdraw()
root.lift()
root.attributes("-topmost", True)
otp = simpledialog.askstring("OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root)
root.destroy()
return (otp or "").strip()
def do_login(page, context):
print("Přihlašuji se do iMedidata...")
page.goto(LOGIN_URL)
wait_load(page, 500)
page.wait_for_selector('input[name="session[username]"]', timeout=10_000)
page.fill('input[name="session[username]"]', USERNAME)
page.fill('input[name="session[password]"]', PASSWORD)
page.click('button[type="submit"]')
wait_load(page, 2000)
dbg(page, "after-signin")
if _okta_mfa_present(page):
print("\n*** OKTA MFA vyžadována! ***")
otp = _ask_otp_popup()
if not otp:
print("CHYBA: OTP nebylo zadáno.")
sys.exit(1)
_fill_otp(page, otp)
wait_load(page, 3000)
try:
page.wait_for_url("**/home.imedidata.com**", timeout=30_000)
except PWTimeout:
dbg(page, "wait-home-timeout")
if "home.imedidata.com" not in page.url:
print("CHYBA: Přihlášení se nezdařilo!")
sys.exit(1)
context.storage_state(path=str(AUTH_FILE))
print("Session uložena do auth.json")
def _okta_mfa_present(page):
if "okta" in page.url.lower():
return True
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[placeholder*="code" i]']:
if page.query_selector(sel):
return True
return False
def _fill_otp(page, otp):
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]']:
el = page.query_selector(sel)
if el:
el.fill(otp)
page.keyboard.press("Enter")
return
page.keyboard.type(otp)
page.keyboard.press("Enter")
# ---------------------------------------------------------------------------
# Navigace
# ---------------------------------------------------------------------------
def go_to_select_role(page):
print("Navigace na SelectRole...")
try:
page.goto(SELECT_ROLE_URL)
except Exception:
pass
wait_load(page, 1500)
dbg(page, "select-role")
return "login" not in page.url.lower() and "okta" not in page.url.lower()
def select_role(page):
print("Vybírám roli Site Manager...")
try:
page.wait_for_selector("select", timeout=10_000)
except PWTimeout:
return
for sel_el in page.query_selector_all("select"):
for opt in sel_el.query_selector_all("option"):
txt = (opt.inner_text() or "").strip()
if "site manager" in txt.lower():
sel_el.select_option(label=txt)
print(f" Vybráno: '{txt}'")
break
for btn_sel in ['input[value="Continue"]', 'input[type="submit"]',
'button:has-text("Continue")', 'button[type="submit"]']:
btn = page.query_selector(btn_sel)
if btn:
btn.click()
break
wait_load(page, 2000)
dbg(page, "after-role")
def navigate_to_reporter(page):
print("Klikám na Reporter...")
page.wait_for_selector('a:has-text("Reporter")', timeout=15_000)
page.click('a:has-text("Reporter")')
wait_load(page, 1500)
dbg(page, "reporter")
def open_report(page):
print(f"Otevírám report ID={REPORT_ID} (Data Listing - Data Stream)...")
selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]'
page.wait_for_selector(selector, timeout=15_000)
page.click(selector)
wait_load(page, 2000)
dbg(page, "report-opened")
# ---------------------------------------------------------------------------
# Parametry reportu
# ---------------------------------------------------------------------------
def set_study_param(page):
"""Rozbalí Study panel a vybere studii podle podřetězce STUDY_SEARCH."""
print(f" Parametr Study: hledám '{STUDY_SEARCH}'...")
page.click('#PromptsBox_st_ShowHideBtn')
page.wait_for_timeout(1500)
# Projdeme checkboxy a hledáme label obsahující STUDY_SEARCH
page.wait_for_selector('input[id^="PromptsBox_st_FrontEndCBList_"]', timeout=10_000)
checkboxes = page.query_selector_all('input[id^="PromptsBox_st_FrontEndCBList_"]')
found = False
for cb in checkboxes:
cb_id = cb.get_attribute("id")
# Label je ve stejné <td> nebo sousední — hledáme přes JS innerText rodiče
label_text = page.evaluate(
"""id => {
const el = document.getElementById(id);
if (!el) return '';
const row = el.closest('tr') || el.closest('td') || el.parentElement;
return row ? row.innerText : '';
}""",
cb_id
)
print(f" [{cb_id}] label: {label_text.strip()[:80]}")
if STUDY_SEARCH.upper() in label_text.upper():
if not page.locator(f"#{cb_id}").is_checked():
page.locator(f"#{cb_id}").check()
print(f" Nalezeno a zaškrtnuto: '{label_text.strip()}'")
found = True
break
if not found:
# Záloha: zkusíme index 0 a varujeme
print(f" VAROVÁNÍ: Studie '{STUDY_SEARCH}' nenalezena! Zkouším index 0...")
cb0 = page.locator('#PromptsBox_st_FrontEndCBList_0')
if not cb0.is_checked():
cb0.check()
wait_load(page, 3000)
dbg(page, "after-study")
def set_site_group_param(page, country: str):
"""Rozbalí Site Group, vybere zadanou zemi a zaškrtne Include Sub Site Groups."""
print(f" Parametr Site Group: {country}")
page.click('#PromptsBox_sg_ShowHideBtn')
page.wait_for_timeout(1500)
page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000)
page.select_option('#PromptsBox_sg_List', label=country)
page.evaluate(
"document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
cb = page.locator('#PromptsBox_sg_CheckBox')
if not cb.is_checked():
cb.check()
page.evaluate(
"document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
# Zavřít panel = potvrzení → spustí postback pro Form
page.click('#PromptsBox_sg_ShowHideBtn')
wait_load(page, 3000)
dbg(page, "after-site-group")
def set_form_param(page, form_name: str):
"""Vybere formulář v Form panelu."""
print(f" Parametr Form: {form_name}")
is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none'
if is_closed:
page.click('#PromptsBox_fm2_ShowHideBtn')
page.wait_for_timeout(2000)
if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible():
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(1000)
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(2000)
search = page.locator('#PromptsBox_fm2_SearchTxt')
search.wait_for(state='visible', timeout=10_000)
search.click()
search.fill(form_name)
page.wait_for_timeout(2000)
search.press('Enter')
page.wait_for_timeout(2000)
cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first
try:
cb_locator.wait_for(state='visible', timeout=8_000)
except PWTimeout:
print(f" VAROVÁNÍ: '{form_name}' nenalezen!")
return
if not cb_locator.is_checked():
cb_locator.click()
print(f" '{form_name}' zaškrtnuto")
page.wait_for_timeout(2000)
# ---------------------------------------------------------------------------
# Submit a download
# ---------------------------------------------------------------------------
def submit_and_download(page, context, form_name: str, country: str | None):
print("Odesílám report...")
with context.expect_page() as new_page_info:
page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click()
new_page = new_page_info.value
new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000)
print(" Čekám na vygenerování reportu (max 5 min)...")
new_page.wait_for_selector(
'input[value="Download File"], button:has-text("Download File")',
timeout=300_000
)
new_page.wait_for_timeout(500)
dbg(new_page, "download-window")
# Najdeme správný frame
target_frame = new_page.main_frame
for frame in new_page.frames:
if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'):
target_frame = frame
break
# File type: .csv (application/vnd.ms-excel)
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
val = opt.get_attribute('value') or ''
if 'vnd.ms-excel' in val:
sel.select_option(value=val)
print(" File type: .csv (application/vnd.ms-excel)")
break
# Export type: attachment
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
if 'attachment' in (opt.get_attribute('value') or '').lower():
sel.select_option(value='attachment')
break
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
country_slug = country if country else "ALL"
form_slug = form_name.replace(" ", "").replace("/", "-").replace("(", "").replace(")", "")
filename = f"{timestamp}_EDC_UCO3001_{country_slug}_{form_slug}_DataListing.csv"
output_path = DOWNLOAD_DIR / filename
print("Stahuji CSV...")
with new_page.expect_download(timeout=60_000) as dl_info:
btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")')
if btn:
btn.click()
else:
new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click()
dl_info.value.save_as(str(output_path))
print(f"\nHotovo! Soubor uložen: {output_path}")
try:
new_page.close()
except Exception:
pass
return output_path
# ---------------------------------------------------------------------------
# Hlavní funkce
# ---------------------------------------------------------------------------
def download_datalisting_reports_3001(form_name: str, country: str | None = None):
"""
Stáhne Data Listing report pro studii UCO3001.
Args:
form_name: Název formuláře, např. "Trial Disposition (Completion / Discontinuation)"
country: Kód site group, např. "CZE". Pokud None, filtr země se nenastaví (všechny).
"""
if not PASSWORD:
print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env")
sys.exit(1)
DOWNLOAD_DIR.mkdir(exist_ok=True)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
ctx_kwargs = {"accept_downloads": True}
use_saved = auth_valid()
if use_saved:
print("Načítám uloženou session (auth.json)...")
ctx_kwargs["storage_state"] = str(AUTH_FILE)
context = browser.new_context(**ctx_kwargs)
page = context.new_page()
logged_in = go_to_select_role(page)
if not logged_in:
if use_saved:
print("Session expirovala, přihlašuji znovu...")
AUTH_FILE.unlink(missing_ok=True)
do_login(page, context)
go_to_select_role(page)
select_role(page)
navigate_to_reporter(page)
open_report(page)
print("\nNastavuji parametry reportu...")
set_study_param(page)
if country:
set_site_group_param(page, country)
else:
print(" Parametr Site Group: přeskočen (všechny země)")
print(f"\n=== Stahuji formulář: {form_name} ===")
set_form_param(page, form_name)
output = submit_and_download(page, context, form_name, country)
browser.close()
print("Prohlížeč zavřen.")
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Příklady spuštění:
# python download_uco3001.py
# python download_uco3001.py CZE
country_arg = sys.argv[1] if len(sys.argv) > 1 else None
download_datalisting_reports_3001(
form_name="Trial Disposition (Completion / Discontinuation)",
country=country_arg,
)
+451
View File
@@ -0,0 +1,451 @@
"""
Import EDC CSV reportů do MongoDB.
Použití:
python edc_import.py report.csv
python edc_import.py reports/*.csv
python edc_import.py report.csv --host mongodb://192.168.1.100:27017 --db klinicka_studie
"""
import argparse
import csv
import glob
import logging
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from pymongo import MongoClient, ASCENDING
from pymongo.errors import PyMongoError
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler("edc_import.log", encoding="utf-8"),
logging.StreamHandler(open(sys.stdout.fileno(), mode="w", encoding="utf-8", closefd=False)),
],
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mapování pevných CSV sloupců na MongoDB cesty
# ---------------------------------------------------------------------------
FIXED_FIELDS = {
"StudyName": "study",
"SiteGroupName": "site.group",
"SiteID": "site.id",
"SiteNumber": "site.number",
"Site": "site.name",
"SubjectID": "subject.id",
"Subject": "subject.label",
"CRFVersionID": "form.crfVersionId",
"InstanceID": "form.instanceId",
"InstanceName": "form.instanceName",
"FolderSeq": "form.folderSeq",
"Page": "form.page",
"RecordID": "form.recordId",
"RecordPosition": "form.recordPosition",
"LastModifiedDate": "lastModified",
"PrintDateTime": "importedAt",
}
# Sloupce, které jdou do _meta (ostatní administrativní)
META_FIELDS = {"RunUser", "VersionNumber", "FilterField"}
# Pole, která se převedou na int
INT_FIELDS = {"Elapsed days"}
# Formáty datumů, které zkusíme parsovat
DATE_FORMATS = [
"%d %b %Y %H:%M:%S", # 20 MAY 2026 12:06:18
"%d %b %Y %H:%M:%S:%f", # 10 Aug 2025 18:13:22:080 (EDC query dates)
"%Y%m%d %H:%M:%S.%f", # 20250810 18:13:22.080 (sortable query dates)
"%Y-%m-%d %H:%M:%S", # 2026-05-20 12:06:28
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%d/%m/%Y %H:%M:%S",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S %p", # 5/20/2026 1:23:27 PM
]
# ---------------------------------------------------------------------------
# QueryDetails — detekce a mapování
# ---------------------------------------------------------------------------
QUERY_DETAIL_MARKER = "QueryID(ReQry)"
QUERY_META_FIELDS = {
"StudyParameter", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter",
"SubjectParameter", "SubjectStatusParameter", "FolderParameter", "FormParameter",
"FieldParameter", "MarkingGroupParameter", "QueryStatusParameter",
"IncludeInactivePagesParameter", "PageSDVParameter", "PageFrozenParameter",
"PageLockedParameter", "StartDateParameter", "EndDateParameter",
"MilestoneParameter", "ReportTypeParameter", "VersionNumber", "TimeZone",
"RunUser", "ErrorString",
# Sortable dates — redundantní, parsujeme z hlavních sloupců
"OpenedDateSrtble", "AnsweredDateSrtble", "ClosedDateSrtble",
# Agregátní počty — jdou do meta
"VisitSiteLevel", "VisitCountryLevel", "VisitStudyLevel",
"PageSubjectLevel", "PageSiteLevel", "PageCountryLevel", "PageStudyLevel",
"Queries (Op/Ans/SDV)",
}
def is_query_details(fieldnames: list[str]) -> bool:
return QUERY_DETAIL_MARKER in fieldnames
def map_query_row(row: dict, source_file: str) -> dict:
"""Přemapuje řádek QueryDetails reportu na MongoDB dokument."""
def val(col: str) -> str:
return (row.get(col) or "").strip()
def int_or_none(col: str):
v = val(col)
if v == "":
return None
try:
return int(v)
except ValueError:
return v
def date_or_str(col: str):
v = val(col)
if not v:
return None
parsed = parse_date(v)
return parsed if parsed else v
meta = {k: row[k].strip() for k in QUERY_META_FIELDS if row.get(k, "").strip()}
doc = {
"study": val("StudyParameter"),
"site": {
"group": val("Country/Region"),
"number": val("Site Number"),
"name": val("Sites"),
},
"subject": {
"label": val("Subjects"),
"status": val("Subject Status"),
},
"visit": val("Visits"),
"page": val("Pages"),
"recordPosition": int_or_none("RecordPosition"),
"field": val("Field"),
"queryGroup": val("Query Group"),
"queryId": val(QUERY_DETAIL_MARKER),
"queryStatus": val("QueryStatus"),
"openedBy": val("Opened By"),
"openedDate": date_or_str("Opened Date"),
"answeredBy": val("Answered By") or None,
"answeredDate": date_or_str("Answered Date"),
"closedBy": val("Closed By") or None,
"closedDate": date_or_str("Closed Date"),
"daysNotYetClosed": int_or_none("DaysNotYetClosed"),
"daysToAnswer": int_or_none("Days to Answer"),
"daysToClose": int_or_none("Days to Close"),
"queryText": val("QueryText"),
"answerText": val("Answer Text (if any)") or None,
"importedAt": date_or_str("PrintDateTime"),
"sourceFile": source_file,
"_meta": meta,
}
# Odstraň None hodnoty z top-level (ne z nested)
return {k: v for k, v in doc.items() if v is not None or k in ("queryId",)}
def ensure_query_indexes(collection) -> None:
collection.create_index([("queryId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.label", ASCENDING)])
collection.create_index([("site.number", ASCENDING)])
collection.create_index([("queryStatus", ASCENDING)])
collection.create_index([("openedDate", ASCENDING)])
def ensure_snapshot_indexes(collection) -> None:
"""Indexy pro queries_snapshots — unikátní kombinace queryId + snapshotDate."""
collection.create_index(
[("queryId", ASCENDING), ("snapshotDate", ASCENDING)],
unique=True,
)
collection.create_index([("snapshotDate", ASCENDING)])
collection.create_index([("queryStatus", ASCENDING)])
collection.create_index([("site.number", ASCENDING)])
collection.create_index([("subject.label", ASCENDING)])
def extract_snapshot_date(filename: str) -> str:
"""
Vytáhne datum ze jména souboru.
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv''2026-05-20'
Fallback: dnešní datum.
"""
stem = Path(filename).name
match = re.match(r"(\d{4}-\d{2}-\d{2})", stem)
if match:
return match.group(1)
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def parse_date(value: str) -> str | None:
"""Pokusí se převést string na ISO 8601; jinak vrátí None."""
value = value.strip()
for fmt in DATE_FORMATS:
try:
dt = datetime.strptime(value, fmt)
return dt.replace(tzinfo=timezone.utc).isoformat()
except ValueError:
continue
return None
def set_nested(doc: dict, path: str, value: str) -> None:
"""Nastaví hodnotu v nested dict podle tečkové cesty, např. 'site.id'."""
parts = path.split(".")
for part in parts[:-1]:
doc = doc.setdefault(part, {})
doc[parts[-1]] = value
def collection_name_from_filename(filename: str) -> str:
"""
Odvodí název kolekce z názvu souboru.
'2026-05-20_15-09_EDC_MDD3003_InterimInvestigatorSignature_DataListing.csv''MDD3003_InterimInvestigatorSignature'
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv''MDD3003_QueryDetails'
"""
stem = Path(filename).stem
# Se suffixem _DataListing
match = re.search(r"EDC_(.+?)_DataListing", stem, re.IGNORECASE)
if match:
return match.group(1)
# Bez suffixu _DataListing (např. QueryDetails)
match = re.search(r"EDC_(.+)$", stem, re.IGNORECASE)
if match:
return match.group(1)
return stem
def map_row(row: dict, source_file: str) -> dict:
"""Přemapuje jeden CSV řádek na MongoDB dokument."""
doc: dict = {}
meta: dict = {}
fields: dict = {}
# Zjisti všechny klíče pro FieldNValue/FieldNLabel
field_keys = set(row.keys())
for col, value in row.items():
value = value.strip() if value else ""
# Pevná pole
if col in FIXED_FIELDS:
path = FIXED_FIELDS[col]
if path == "form.folderSeq":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path == "form.recordPosition":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path in ("lastModified", "importedAt"):
parsed = parse_date(value)
value = parsed if parsed else value
set_nested(doc, path, value)
continue
# Meta pole
if col in META_FIELDS:
if value:
meta[col] = value
continue
# FieldNLabel / FieldNValue jsou zpracovány níže
if re.match(r"^Field\d+(Value|Label)$", col):
continue
# Zbývající neznámé pevné sloupce také do meta
if not re.match(r"^Field\d+", col):
if value:
meta[col] = value
# Zpracuj páry Field1Value/Field1Label ... Field300Value/Field300Label
n = 1
while True:
val_key = f"Field{n}Value"
lbl_key = f"Field{n}Label"
if val_key not in field_keys and lbl_key not in field_keys:
break
label = (row.get(lbl_key) or "").strip()
value = (row.get(val_key) or "").strip()
if label and value:
# Pokus o převod čísel
if label in INT_FIELDS:
try:
fields[label] = int(value)
except ValueError:
fields[label] = value
else:
# Pokus o datum
parsed = parse_date(value)
fields[label] = parsed if parsed else value
n += 1
doc["fields"] = fields
doc["sourceFile"] = source_file
if meta:
doc["_meta"] = meta
return doc
def ensure_indexes(collection) -> None:
collection.create_index([("form.recordId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.id", ASCENDING)])
collection.create_index([("site.id", ASCENDING)])
collection.create_index([("study", ASCENDING)])
collection.create_index([("lastModified", ASCENDING)])
def import_file(
csv_path: str,
collection,
snapshot_col=None,
snapshot_date: str | None = None,
) -> tuple[int, int, int]:
"""
Importuje jeden CSV soubor. Vrátí (inserted, updated, errors).
snapshot_col: pokud je zadán, pro QueryDetails se zapíše i daily snapshot.
"""
inserted = updated = errors = 0
source_file = Path(csv_path).name
with open(csv_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f, delimiter=",", quotechar='"')
query_mode = is_query_details(reader.fieldnames or [])
for line_no, row in enumerate(reader, start=2):
try:
if query_mode:
doc = map_query_row(row, source_file)
upsert_key = {"queryId": doc["queryId"]}
# Snapshot — upsert na (queryId, snapshotDate)
if snapshot_col is not None and snapshot_date:
snap_doc = {**doc, "snapshotDate": snapshot_date}
snapshot_col.update_one(
{"queryId": doc["queryId"], "snapshotDate": snapshot_date},
{"$set": snap_doc},
upsert=True,
)
else:
doc = map_row(row, source_file)
record_id = doc.get("form", {}).get("recordId")
upsert_key = {"form.recordId": record_id} if record_id else None
if upsert_key:
result = collection.update_one(
upsert_key,
{"$set": doc},
upsert=True,
)
if result.upserted_id:
inserted += 1
else:
updated += 1
else:
collection.insert_one(doc)
inserted += 1
except PyMongoError as e:
errors += 1
log.error("Řádek %d v %s: MongoDB chyba: %s", line_no, csv_path, e)
except Exception as e:
errors += 1
log.error("Řádek %d v %s: %s", line_no, csv_path, e)
return inserted, updated, errors
def main() -> None:
parser = argparse.ArgumentParser(description="Import EDC CSV reportů do MongoDB")
parser.add_argument("files", nargs="+", help="CSV soubory nebo glob vzor")
parser.add_argument("--host", default="mongodb://192.168.1.76:27017", help="MongoDB URI")
parser.add_argument("--db", default="edc", help="Název databáze")
args = parser.parse_args()
# Rozbal glob vzory (důležité na Windows kde shell sám neglobuje)
paths: list[str] = []
for pattern in args.files:
expanded = glob.glob(pattern)
paths.extend(expanded if expanded else [pattern])
if not paths:
log.error("Žádné soubory nenalezeny.")
sys.exit(1)
client = MongoClient(args.host, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
except Exception as e:
log.error("Nelze se připojit k MongoDB (%s): %s", args.host, e)
sys.exit(1)
db = client[args.db]
total_inserted = total_updated = total_errors = 0
for csv_path in paths:
if not os.path.isfile(csv_path):
log.warning("Soubor neexistuje, přeskakuji: %s", csv_path)
continue
# Detekuj typ souboru a vyber kolekci + indexy
with open(csv_path, encoding="utf-8", newline="") as f:
fieldnames = csv.DictReader(f).fieldnames or []
if is_query_details(fieldnames):
col_name = "queries"
collection = db[col_name]
ensure_query_indexes(collection)
snapshot_col = db["queries_snapshots"]
ensure_snapshot_indexes(snapshot_col)
snapshot_date = extract_snapshot_date(csv_path)
log.info("Importuji: %s%s.%s + queries_snapshots [%s]",
csv_path, args.db, col_name, snapshot_date)
else:
col_name = collection_name_from_filename(csv_path)
collection = db[col_name]
ensure_indexes(collection)
snapshot_col = None
snapshot_date = None
log.info("Importuji: %s%s.%s", csv_path, args.db, col_name)
inserted, updated, errors = import_file(
csv_path, collection, snapshot_col, snapshot_date
)
total_inserted += inserted
total_updated += updated
total_errors += errors
log.info(" nové: %d aktualizované: %d chyby: %d", inserted, updated, errors)
log.info("=" * 60)
log.info("Celkem — nové: %d aktualizované: %d chyby: %d",
total_inserted, total_updated, total_errors)
client.close()
if __name__ == "__main__":
main()