from playwright.sync_api import sync_playwright import re import os import datetime import mysql.connector import db_config def get_existing_pks(study): """Vrátí set pk notifikací které už jsou v DB pro danou studii.""" try: conn = mysql.connector.connect( host=db_config.DB_HOST, port=db_config.DB_PORT, user=db_config.DB_USER, password=db_config.DB_PASSWORD, database=db_config.DB_NAME, ) cursor = conn.cursor() cursor.execute("SELECT pk FROM iwrs_notifications WHERE study = %s", (study,)) pks = {row[0] for row in cursor.fetchall()} cursor.close() conn.close() return pks except Exception as e: print(f" UPOZORNĚNÍ: nelze načíst existující pk z DB ({e}), stahuji vše") return set() BASE_URL = "https://janssen.4gclinical.com" EMAIL = "vbuzalka@its.jnj.com" PASSWORD = "Vlado123++-+" STUDY = "77242113UCO3001" SUBJECT = "CZ100222003" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails") def strip_html(html): text = re.sub(r"", "\n", html, flags=re.IGNORECASE) text = re.sub(r"<[^>]+>", "", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def main(): existing_pks = get_existing_pks(STUDY) print(f"V DB již existuje {len(existing_pks)} notifikací pro {STUDY}") with sync_playwright() as p: browser = p.chromium.launch(headless=False, args=["--start-maximized"]) context = browser.new_context(no_viewport=True) page = context.new_page() print("Přihlašuji se...") page.goto(BASE_URL) page.wait_for_load_state("networkidle") page.get_by_label("Email *").fill(EMAIL) page.get_by_label("Password *").fill(PASSWORD) page.locator("#login__submit").click() page.wait_for_load_state("networkidle") page.get_by_label("Study *").click() page.get_by_role("option", name=STUDY).click() page.get_by_role("button", name="SELECT").click() page.wait_for_load_state("networkidle") page.goto(f"{BASE_URL}/report/patient_detail_report") page.wait_for_load_state("networkidle", timeout=60000) # JWT + api_base jwt = page.evaluate("localStorage.getItem('JWT.access')") print(f"JWT: {jwt[:30]}...") instances = page.evaluate("""async (jwt) => { const res = await fetch('/_/api/dispatch/app_instances/', { headers: { 'Authorization': `Bearer ${jwt}` } }); return res.json(); }""", jwt) instance = next((i for i in instances if STUDY in i.get("label", "")), None) if not instance: raise ValueError(f"Instance pro {STUDY} nenalezena") api_base = instance["api_base_url"] print(f"API base: {api_base}") # Vyber subjekt a zachyť table_1 response přímo print(f"Vybírám subjekt {SUBJECT}...") input_field = page.locator('input[placeholder="search"], input[type="text"]').first input_field.click() input_field.fill(SUBJECT) page.wait_for_timeout(1000) captured = {} with page.expect_response( lambda r: "report_data" in r.url and "table_1" in r.url, timeout=60000 ) as resp_info: page.locator("mat-option").first.dispatch_event("click") response = resp_info.value data = response.json() out_dir = os.path.join(DETAILS_DIR, STUDY) os.makedirs(out_dir, exist_ok=True) today = datetime.date.today().strftime("%Y-%m-%d") print(f"\n{'='*60}") print(f"Subjekt: {SUBJECT} | Studie: {STUDY}") print(f"{'='*60}") count = 0 for row in data.get("data", []): for notif in (row.get("notification") or []): item = notif.get("item", {}) pk = item.get("pk") title = item.get("et_title") label = (notif.get("label") or title or "").strip() # Celý label, mezery → podtržítka, nepovolené znaky pryč safe_label = re.sub(r'[\\/*?:"<>|]', "", label).replace(" ", "_") body = item.get("body", "") text = strip_html(body) count += 1 print(f"\n--- Notifikace #{count}: {safe_label} (pk={pk}) | event: {row.get('event_event_id')} ---") print(text) if pk in existing_pks: print(f" → pk={pk} již v DB, přeskakuji") continue actual_date = row.get("actual_date_raw", "0000-00-00") pdf_filename = os.path.join(out_dir, f"{actual_date}_{safe_label}.pdf") if os.path.exists(pdf_filename): pdf_filename = os.path.join(out_dir, f"{actual_date}_{safe_label}_pk{pk}.pdf") pdf_url = f"{BASE_URL}{api_base}/api/v1/meta_api/pdfnotification?pk={pk}&title={title}&html=true" pdf_resp = page.request.get(pdf_url, headers={ "Authorization": f"Bearer {jwt}", "lang": "en", "prancer_study": STUDY, "Accept": "application/json, text/plain, */*", }) if pdf_resp.ok: with open(pdf_filename, "wb") as f: f.write(pdf_resp.body()) print(f" → PDF uloženo: {os.path.basename(pdf_filename)}") json_filename = pdf_filename.replace(".pdf", ".json") import json with open(json_filename, "w", encoding="utf-8") as f: json.dump({ "pk": pk, "title": title, "label": label, "event": row.get("event_event_id"), "actual_date": actual_date, "subject": SUBJECT, "study": STUDY, "text": text, }, f, ensure_ascii=False, indent=2) print(f" → JSON uloženo: {os.path.basename(json_filename)}") else: print(f" → PDF chyba: {pdf_resp.status}") page.wait_for_timeout(300) if count == 0: print("Žádné notifikace nalezeny.") else: print(f"\n{'='*60}") print(f"Celkem notifikací: {count}") browser.close() main()