Files
janssen/IWRS/download_subject_details.py
2026-06-10 11:59:03 +02:00

278 lines
11 KiB
Python

from playwright.sync_api import sync_playwright
import os
import glob
import datetime
import re
import json
import sys
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common.mongo_writer import get_db
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
# ────────────────────────────────────────────────────────────────────────────
def get_subjects(study, source_dir=None):
src = source_dir or INCOMING_DIR
pattern = os.path.join(src, f"* {study} Subject Summary Report*.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
today = datetime.date.today().strftime("%Y-%m-%d")
if not os.path.basename(files[0]).startswith(today):
raise FileNotFoundError(f"Dnešní Subject Summary Report pro {study} neexistuje — spusť nejdříve download_subject_summary.py")
path = files[0]
print(f" Čtu subjekty z: {os.path.basename(path)}")
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError("Hlavičkový řádek nenalezen")
df = pd.read_excel(path, header=header_row)
return df["Subject"].dropna().astype(str).str.strip().tolist()
def strip_html(html):
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def get_existing_pks(study):
"""Načte už importované pk notifikací pro studii z Mongo."""
try:
db = get_db()
return {d["_id"] for d in db.iwrs_notifications.find(
{"study": study}, {"_id": 1}
)}
except Exception as e:
print(f" UPOZORNĚNÍ: nelze načíst pk z Mongo ({e}), stahuji vše")
return set()
def download_notifications_for_subject(page, study, subject, api_base, existing_pks, out_dir, table1_data, flat=False):
"""Stáhne notifikace pro subjekta z již zachycené table_1 response.
flat=True → název obsahuje study+subject (pro Incoming/ kde leží všechno pohromadě).
"""
new_count = 0
for row in table1_data.get("data", []):
for notif in (row.get("notification") or []):
item = notif.get("item", {})
pk = item.get("pk")
title = item.get("et_title")
if not pk or pk in existing_pks:
continue
label = (notif.get("label") or title or "").strip()
safe_label = re.sub(r'[\\/*?:"<>|]', "", label).replace(" ", "_")
body = item.get("body", "")
text = strip_html(body)
actual_date = row.get("actual_date_raw", "0000-00-00")
if flat:
stem = f"{actual_date}_{study}_{subject}_{safe_label}"
else:
stem = f"{actual_date}_{safe_label}"
pdf_filename = os.path.join(out_dir, f"{stem}.pdf")
if os.path.exists(pdf_filename):
pdf_filename = os.path.join(out_dir, f"{stem}_pk{pk}.pdf")
# Načti JWT čerstvě před každým requestem
jwt = page.evaluate("localStorage.getItem('JWT.access')")
pdf_url = f"{BASE_URL}{api_base}/api/v1/meta_api/pdfnotification?pk={pk}&title={title}&html=true"
pdf_resp = page.request.get(pdf_url, headers={
"Authorization": f"Bearer {jwt}",
"lang": "en",
"prancer_study": study,
"Accept": "application/json, text/plain, */*",
})
if pdf_resp.ok:
with open(pdf_filename, "wb") as f:
f.write(pdf_resp.body())
json_filename = pdf_filename.replace(".pdf", ".json")
with open(json_filename, "w", encoding="utf-8") as f:
json.dump({
"pk": pk, "title": title, "label": label,
"event": row.get("event_event_id"),
"actual_date": actual_date,
"subject": subject, "study": study, "text": text,
}, f, ensure_ascii=False, indent=2)
existing_pks.add(pk)
new_count += 1
print(f" [{subject}] notifikace pk={pk} OK")
else:
print(f" [{subject}] notifikace pk={pk} CHYBA: {pdf_resp.status}")
page.wait_for_timeout(1000)
return new_count
def run(page, study, out_dir=None, subjects_source_dir=None):
"""
out_dir=None → legacy: ukládá do IncomingSourceReportsDetails/{study}/
out_dir=cesta → vše ukládá ploše do té cesty (Incoming/).
subjects_source_dir=None → čte summary z IncomingSourceReports/ (legacy).
"""
flat = out_dir is not None
if out_dir is None:
out_dir = os.path.join(DETAILS_DIR, study)
os.makedirs(out_dir, exist_ok=True)
subjects = get_subjects(study, subjects_source_dir)
print(f" Nalezeno {len(subjects)} subjektů")
today = datetime.date.today().strftime("%Y-%m-%d")
# api_base pro notifikace
jwt_init = page.evaluate("localStorage.getItem('JWT.access')")
instances = page.evaluate("""async (jwt) => {
const res = await fetch('/_/api/dispatch/app_instances/', {
headers: { 'Authorization': `Bearer ${jwt}` }
});
return res.json();
}""", jwt_init)
instance = next((i for i in instances if study in i.get("label", "")), None)
api_base = instance["api_base_url"] if instance else None
if not api_base:
print(f" UPOZORNĚNÍ: api_base nenalezena, notifikace se nebudou stahovat")
existing_pks = get_existing_pks(study)
print(f" V DB již {len(existing_pks)} notifikací pro {study}")
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
total_notif = 0
for subject in subjects:
filename = os.path.join(out_dir, f"{today} {study} {subject} Subject Detail.xlsx")
success = False
table1_data = None
for attempt in range(1, 4):
try:
print(f" [{subject}] Stahuji... (pokus {attempt}/3)")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(subject)
page.wait_for_timeout(500)
# Zachytíme table_1 response při výběru subjektu
if api_base:
try:
with page.expect_response(
lambda r: "report_data" in r.url and "table_1" in r.url,
timeout=60000
) as resp_info:
page.locator("mat-option").first.dispatch_event("click")
table1_data = resp_info.value.json()
except Exception as e:
print(f" [{subject}] CHYBA zachycení table_1: {e}")
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
table1_data = None
else:
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
table1_data = None
page.wait_for_load_state("networkidle", timeout=120000)
page.wait_for_timeout(2000)
with page.expect_download(timeout=60000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{subject}] XLS OK")
success = True
break
except Exception as e:
print(f" [{subject}] pokus {attempt} selhal: {e}")
if attempt < 3:
try:
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
except Exception as ge:
print(f" [{subject}] refresh selhal: {ge}")
if not success:
print(f" [{subject}] PŘESKAKUJI po 3 neúspěšných pokusech")
try:
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
except Exception:
pass
continue
# Stáhnout notifikace pro tohoto subjekta
if api_base and table1_data:
n = download_notifications_for_subject(
page, study, subject, api_base, existing_pks, out_dir, table1_data, flat=flat
)
total_notif += n
try:
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
except Exception as e:
print(f" [{subject}] Clear selhal: {e} — refresh")
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
print(f" [{study}] Subject details hotovo. Nových notifikací: {total_notif}")
def main():
os.makedirs(DETAILS_DIR, exist_ok=True)
with sync_playwright() as p:
for study in STUDIES:
print(f"\n[{study}] Přihlášení...")
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
try:
run(page, study)
except Exception as e:
print(f" [{study}] CHYBA: {e}")
browser.close()
print("\nVše hotovo.")
if __name__ == "__main__":
main()