Merge remote-tracking branch 'origin/master'

# Conflicts:
#	Medidata/auth.json
This commit is contained in:
2026-05-25 12:42:44 +02:00
76 changed files with 6930 additions and 0 deletions
+93
View File
@@ -0,0 +1,93 @@
import sqlite3
import os
from pathlib import Path
DB_DIR = r"\\tower\JNJEMAILS\db"
MSG_DIR = r"\\tower\JNJEMAILS"
def find_latest_db(db_dir):
dbs = sorted(Path(db_dir).glob("*.db"), key=lambda p: p.stat().st_mtime)
if not dbs:
raise FileNotFoundError(f"Žádná .db databáze v {db_dir}")
return dbs[-1]
def count_msg_files(msg_dir):
return sum(1 for f in Path(msg_dir).iterdir() if f.suffix.lower() == ".msg")
def stats(db_path):
con = sqlite3.connect(db_path)
cur = con.cursor()
total = cur.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
date_range = cur.execute(
"SELECT MIN(received_at), MAX(received_at) FROM messages WHERE received_at IS NOT NULL"
).fetchone()
top_senders = cur.execute(
"SELECT sender, COUNT(*) AS n FROM messages GROUP BY sender ORDER BY n DESC LIMIT 10"
).fetchall()
by_folder = cur.execute(
"SELECT folder, COUNT(*) AS n FROM messages GROUP BY folder ORDER BY n DESC"
).fetchall()
by_source = cur.execute(
"SELECT source, COUNT(*) AS n FROM messages GROUP BY source ORDER BY n DESC"
).fetchall()
by_month = cur.execute(
"""SELECT SUBSTR(received_at, 1, 7) AS month, COUNT(*) AS n
FROM messages WHERE received_at IS NOT NULL
GROUP BY month ORDER BY month"""
).fetchall()
con.close()
return total, date_range, top_senders, by_folder, by_source, by_month
def main():
db_path = find_latest_db(DB_DIR)
print(f"Databáze: {db_path.name}")
print(f"Velikost: {db_path.stat().st_size / 1024 / 1024:.1f} MB")
msg_count = count_msg_files(MSG_DIR)
print(f".msg souborů ve složce: {msg_count:,}")
total, date_range, top_senders, by_folder, by_source, by_month = stats(db_path)
print(f"\n{'-'*50}")
print(f" Emailu v databazi: {total:,}")
if date_range[0]:
print(f" Nejstarsi: {date_range[0]}")
print(f" Nejnovejsi: {date_range[1]}")
if by_folder:
print(f"\n Slozky:")
for folder, n in by_folder:
print(f" {folder or '(bez slozky)':<35} {n:>6,}")
if by_source:
print(f"\n Zdroje (source):")
for src, n in by_source:
print(f" {src or '(prazdny)':<35} {n:>6,}")
if by_month:
print(f"\n Emaily po mesicich:")
for month, n in by_month:
bar = "#" * min(n // 20, 40)
print(f" {month} {bar:<40} {n:>5,}")
if top_senders:
print(f"\n Top 10 odesilatelU:")
for sender, n in top_senders:
print(f" {(sender or '(neznamy)')[:50]:<52} {n:>5,}")
print(f"{'-'*50}")
if __name__ == "__main__":
main()
+440
View File
@@ -0,0 +1,440 @@
"""
Stahuje Data Listing reporty (ReportID=92) pro studii UCO3001.
Použití:
download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)")
download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)", country="CZE")
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
import tkinter as tk
from tkinter import simpledialog
load_dotenv(Path(__file__).parent / ".env")
USERNAME = os.getenv("IMEDIDATA_USERNAME", "vladimir.buzalka")
PASSWORD = os.getenv("IMEDIDATA_PASSWORD", "")
DOWNLOAD_DIR = Path(__file__).parent / "downloads"
AUTH_FILE = Path(__file__).parent / "auth.json"
AUTH_MAX_AGE_DAYS = 7
LOGIN_URL = "https://login.imedidata.com/login"
SELECT_ROLE_URL = (
"https://jnjja.mdsol.com/MedidataRave/SelectRole.aspx"
"?client_division_uuid=e5de55d5-a414-4bd1-9abe-18e96fd5475d"
"&study_group_uuid=b0793ca6-33ec-44e8-883b-6fc1a4b671c4"
"&studygroup_id=107981"
)
STUDY_SEARCH = "77242113UCO3001" # hledáme podle podřetězce v názvu studie
REPORT_ID = 92 # _EDC Std Rpt - Data Listing (Data Stream)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def auth_valid():
if not AUTH_FILE.exists():
return False
age = datetime.now() - datetime.fromtimestamp(AUTH_FILE.stat().st_mtime)
return age < timedelta(days=AUTH_MAX_AGE_DAYS)
def wait_load(page, extra_ms=1000):
try:
page.wait_for_load_state("load", timeout=20_000)
except PWTimeout:
pass
page.wait_for_timeout(extra_ms)
def dbg(page, label):
print(f"[{label}] URL: {page.url}")
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
def _ask_otp_popup():
root = tk.Tk()
root.withdraw()
root.lift()
root.attributes("-topmost", True)
otp = simpledialog.askstring("OKTA MFA", "Zadej OTP kód z OKTA (6 číslic):", parent=root)
root.destroy()
return (otp or "").strip()
def do_login(page, context):
print("Přihlašuji se do iMedidata...")
page.goto(LOGIN_URL)
wait_load(page, 500)
page.wait_for_selector('input[name="session[username]"]', timeout=10_000)
page.fill('input[name="session[username]"]', USERNAME)
page.fill('input[name="session[password]"]', PASSWORD)
page.click('button[type="submit"]')
wait_load(page, 2000)
dbg(page, "after-signin")
if _okta_mfa_present(page):
print("\n*** OKTA MFA vyžadována! ***")
otp = _ask_otp_popup()
if not otp:
print("CHYBA: OTP nebylo zadáno.")
sys.exit(1)
_fill_otp(page, otp)
wait_load(page, 3000)
try:
page.wait_for_url("**/home.imedidata.com**", timeout=30_000)
except PWTimeout:
dbg(page, "wait-home-timeout")
if "home.imedidata.com" not in page.url:
print("CHYBA: Přihlášení se nezdařilo!")
sys.exit(1)
context.storage_state(path=str(AUTH_FILE))
print("Session uložena do auth.json")
def _okta_mfa_present(page):
if "okta" in page.url.lower():
return True
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[placeholder*="code" i]']:
if page.query_selector(sel):
return True
return False
def _fill_otp(page, otp):
for sel in ['input[name="answer"]', 'input[name*="otp"]',
'input[name*="code"]', 'input[type="tel"]', 'input[placeholder*="code" i]']:
el = page.query_selector(sel)
if el:
el.fill(otp)
page.keyboard.press("Enter")
return
page.keyboard.type(otp)
page.keyboard.press("Enter")
# ---------------------------------------------------------------------------
# Navigace
# ---------------------------------------------------------------------------
def go_to_select_role(page):
print("Navigace na SelectRole...")
try:
page.goto(SELECT_ROLE_URL)
except Exception:
pass
wait_load(page, 1500)
dbg(page, "select-role")
return "login" not in page.url.lower() and "okta" not in page.url.lower()
def select_role(page):
print("Vybírám roli Site Manager...")
try:
page.wait_for_selector("select", timeout=10_000)
except PWTimeout:
return
for sel_el in page.query_selector_all("select"):
for opt in sel_el.query_selector_all("option"):
txt = (opt.inner_text() or "").strip()
if "site manager" in txt.lower():
sel_el.select_option(label=txt)
print(f" Vybráno: '{txt}'")
break
for btn_sel in ['input[value="Continue"]', 'input[type="submit"]',
'button:has-text("Continue")', 'button[type="submit"]']:
btn = page.query_selector(btn_sel)
if btn:
btn.click()
break
wait_load(page, 2000)
dbg(page, "after-role")
def navigate_to_reporter(page):
print("Klikám na Reporter...")
page.wait_for_selector('a:has-text("Reporter")', timeout=15_000)
page.click('a:has-text("Reporter")')
wait_load(page, 1500)
dbg(page, "reporter")
def open_report(page):
print(f"Otevírám report ID={REPORT_ID} (Data Listing - Data Stream)...")
selector = f'a[href="PromptsPage.aspx?ReportID={REPORT_ID}"]'
page.wait_for_selector(selector, timeout=15_000)
page.click(selector)
wait_load(page, 2000)
dbg(page, "report-opened")
# ---------------------------------------------------------------------------
# Parametry reportu
# ---------------------------------------------------------------------------
def set_study_param(page):
"""Rozbalí Study panel a vybere studii podle podřetězce STUDY_SEARCH."""
print(f" Parametr Study: hledám '{STUDY_SEARCH}'...")
page.click('#PromptsBox_st_ShowHideBtn')
page.wait_for_timeout(1500)
# Projdeme checkboxy a hledáme label obsahující STUDY_SEARCH
page.wait_for_selector('input[id^="PromptsBox_st_FrontEndCBList_"]', timeout=10_000)
checkboxes = page.query_selector_all('input[id^="PromptsBox_st_FrontEndCBList_"]')
found = False
for cb in checkboxes:
cb_id = cb.get_attribute("id")
# Label je ve stejné <td> nebo sousední — hledáme přes JS innerText rodiče
label_text = page.evaluate(
"""id => {
const el = document.getElementById(id);
if (!el) return '';
const row = el.closest('tr') || el.closest('td') || el.parentElement;
return row ? row.innerText : '';
}""",
cb_id
)
print(f" [{cb_id}] label: {label_text.strip()[:80]}")
if STUDY_SEARCH.upper() in label_text.upper():
if not page.locator(f"#{cb_id}").is_checked():
page.locator(f"#{cb_id}").check()
print(f" Nalezeno a zaškrtnuto: '{label_text.strip()}'")
found = True
break
if not found:
# Záloha: zkusíme index 0 a varujeme
print(f" VAROVÁNÍ: Studie '{STUDY_SEARCH}' nenalezena! Zkouším index 0...")
cb0 = page.locator('#PromptsBox_st_FrontEndCBList_0')
if not cb0.is_checked():
cb0.check()
wait_load(page, 3000)
dbg(page, "after-study")
def set_site_group_param(page, country: str):
"""Rozbalí Site Group, vybere zadanou zemi a zaškrtne Include Sub Site Groups."""
print(f" Parametr Site Group: {country}")
page.click('#PromptsBox_sg_ShowHideBtn')
page.wait_for_timeout(1500)
page.wait_for_selector('#PromptsBox_sg_List', timeout=10_000)
page.select_option('#PromptsBox_sg_List', label=country)
page.evaluate(
"document.querySelector('#PromptsBox_sg_List').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
cb = page.locator('#PromptsBox_sg_CheckBox')
if not cb.is_checked():
cb.check()
page.evaluate(
"document.querySelector('#PromptsBox_sg_CheckBox').dispatchEvent(new Event('change', {bubbles:true}))"
)
wait_load(page, 2000)
# Zavřít panel = potvrzení → spustí postback pro Form
page.click('#PromptsBox_sg_ShowHideBtn')
wait_load(page, 3000)
dbg(page, "after-site-group")
def set_form_param(page, form_name: str):
"""Vybere formulář v Form panelu."""
print(f" Parametr Form: {form_name}")
is_closed = page.locator('#PromptsBox_fm2_div').evaluate('el => el.style.display') == 'none'
if is_closed:
page.click('#PromptsBox_fm2_ShowHideBtn')
page.wait_for_timeout(2000)
if page.locator('#PromptsBox_fm2_PageModeBtn').is_visible():
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(1000)
page.click('#PromptsBox_fm2_PageModeBtn')
page.wait_for_timeout(2000)
search = page.locator('#PromptsBox_fm2_SearchTxt')
search.wait_for(state='visible', timeout=10_000)
search.click()
search.fill(form_name)
page.wait_for_timeout(2000)
search.press('Enter')
page.wait_for_timeout(2000)
cb_locator = page.locator('input[id^="PromptsBox_fm2_FrontEndCBList_"]').first
try:
cb_locator.wait_for(state='visible', timeout=8_000)
except PWTimeout:
print(f" VAROVÁNÍ: '{form_name}' nenalezen!")
return
if not cb_locator.is_checked():
cb_locator.click()
print(f" '{form_name}' zaškrtnuto")
page.wait_for_timeout(2000)
# ---------------------------------------------------------------------------
# Submit a download
# ---------------------------------------------------------------------------
def submit_and_download(page, context, form_name: str, country: str | None):
print("Odesílám report...")
with context.expect_page() as new_page_info:
page.locator('input[value="Submit Report"], button:has-text("Submit Report")').first.click()
new_page = new_page_info.value
new_page.wait_for_url(lambda url: url != 'about:blank', timeout=30_000)
print(" Čekám na vygenerování reportu (max 5 min)...")
new_page.wait_for_selector(
'input[value="Download File"], button:has-text("Download File")',
timeout=300_000
)
new_page.wait_for_timeout(500)
dbg(new_page, "download-window")
# Najdeme správný frame
target_frame = new_page.main_frame
for frame in new_page.frames:
if frame.query_selector('select') or frame.query_selector('input[value="Download File"]'):
target_frame = frame
break
# File type: .csv (application/vnd.ms-excel)
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
val = opt.get_attribute('value') or ''
if 'vnd.ms-excel' in val:
sel.select_option(value=val)
print(" File type: .csv (application/vnd.ms-excel)")
break
# Export type: attachment
for sel in target_frame.query_selector_all('select'):
for opt in sel.query_selector_all('option'):
if 'attachment' in (opt.get_attribute('value') or '').lower():
sel.select_option(value='attachment')
break
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
country_slug = country if country else "ALL"
form_slug = form_name.replace(" ", "").replace("/", "-").replace("(", "").replace(")", "")
filename = f"{timestamp}_EDC_UCO3001_{country_slug}_{form_slug}_DataListing.csv"
output_path = DOWNLOAD_DIR / filename
print("Stahuji CSV...")
with new_page.expect_download(timeout=60_000) as dl_info:
btn = target_frame.query_selector('input[value="Download File"], button:has-text("Download File")')
if btn:
btn.click()
else:
new_page.locator('input[value="Download File"], button:has-text("Download File")').first.click()
dl_info.value.save_as(str(output_path))
print(f"\nHotovo! Soubor uložen: {output_path}")
try:
new_page.close()
except Exception:
pass
return output_path
# ---------------------------------------------------------------------------
# Hlavní funkce
# ---------------------------------------------------------------------------
def download_datalisting_reports_3001(form_name: str, country: str | None = None):
"""
Stáhne Data Listing report pro studii UCO3001.
Args:
form_name: Název formuláře, např. "Trial Disposition (Completion / Discontinuation)"
country: Kód site group, např. "CZE". Pokud None, filtr země se nenastaví (všechny).
"""
if not PASSWORD:
print("Chyba: nastav IMEDIDATA_PASSWORD v souboru .env")
sys.exit(1)
DOWNLOAD_DIR.mkdir(exist_ok=True)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
ctx_kwargs = {"accept_downloads": True}
use_saved = auth_valid()
if use_saved:
print("Načítám uloženou session (auth.json)...")
ctx_kwargs["storage_state"] = str(AUTH_FILE)
context = browser.new_context(**ctx_kwargs)
page = context.new_page()
logged_in = go_to_select_role(page)
if not logged_in:
if use_saved:
print("Session expirovala, přihlašuji znovu...")
AUTH_FILE.unlink(missing_ok=True)
do_login(page, context)
go_to_select_role(page)
select_role(page)
navigate_to_reporter(page)
open_report(page)
print("\nNastavuji parametry reportu...")
set_study_param(page)
if country:
set_site_group_param(page, country)
else:
print(" Parametr Site Group: přeskočen (všechny země)")
print(f"\n=== Stahuji formulář: {form_name} ===")
set_form_param(page, form_name)
output = submit_and_download(page, context, form_name, country)
browser.close()
print("Prohlížeč zavřen.")
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Příklady spuštění:
# python download_uco3001.py
# python download_uco3001.py CZE
country_arg = sys.argv[1] if len(sys.argv) > 1 else None
download_datalisting_reports_3001(
form_name="Trial Disposition (Completion / Discontinuation)",
country=country_arg,
)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+12
View File
@@ -0,0 +1,12 @@
from download_uco3001 import download_datalisting_reports_3001
# === Vyber jeden řádek, odkomentuj ho a spusť ===
# --- Trial Disposition ---
download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)")
download_datalisting_reports_3001("Date of Visit")
# download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)", country="CZE")
# --- Date of Visit ---
# download_datalisting_reports_3001("Date of Visit")
# download_datalisting_reports_3001("Date of Visit", country="CZE")