Files
janssen/IWRS/Patients/download_subject_details.py
T
2026-06-01 16:49:38 +02:00

239 lines
9.3 KiB
Python

from playwright.sync_api import sync_playwright
import os
import glob
import datetime
import re
import json
import mysql.connector
import pandas as pd
import db_config
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
# ────────────────────────────────────────────────────────────────────────────
def get_subjects(study):
pattern = os.path.join(INCOMING_DIR, f"* {study} Subject Summary Report.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
today = datetime.date.today().strftime("%Y-%m-%d")
if not os.path.basename(files[0]).startswith(today):
raise FileNotFoundError(f"Dnešní Subject Summary Report pro {study} neexistuje — spusť nejdříve download_subject_summary.py")
path = files[0]
print(f" Čtu subjekty z: {os.path.basename(path)}")
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError("Hlavičkový řádek nenalezen")
df = pd.read_excel(path, header=header_row)
return df["Subject"].dropna().astype(str).str.strip().tolist()
def strip_html(html):
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def get_existing_pks(study):
try:
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
cursor = conn.cursor()
cursor.execute("SELECT pk FROM iwrs_notifications WHERE study = %s", (study,))
pks = {row[0] for row in cursor.fetchall()}
cursor.close()
conn.close()
return pks
except Exception as e:
print(f" UPOZORNĚNÍ: nelze načíst pk z DB ({e}), stahuji vše")
return set()
def download_notifications_for_subject(page, study, subject, api_base, existing_pks, out_dir, table1_data):
"""Stáhne notifikace pro subjekta z již zachycené table_1 response."""
new_count = 0
for row in table1_data.get("data", []):
for notif in (row.get("notification") or []):
item = notif.get("item", {})
pk = item.get("pk")
title = item.get("et_title")
if not pk or pk in existing_pks:
continue
label = (notif.get("label") or title or "").strip()
safe_label = re.sub(r'[\\/*?:"<>|]', "", label).replace(" ", "_")
body = item.get("body", "")
text = strip_html(body)
actual_date = row.get("actual_date_raw", "0000-00-00")
pdf_filename = os.path.join(out_dir, f"{actual_date}_{safe_label}.pdf")
if os.path.exists(pdf_filename):
pdf_filename = os.path.join(out_dir, f"{actual_date}_{safe_label}_pk{pk}.pdf")
# Načti JWT čerstvě před každým requestem
jwt = page.evaluate("localStorage.getItem('JWT.access')")
pdf_url = f"{BASE_URL}{api_base}/api/v1/meta_api/pdfnotification?pk={pk}&title={title}&html=true"
pdf_resp = page.request.get(pdf_url, headers={
"Authorization": f"Bearer {jwt}",
"lang": "en",
"prancer_study": study,
"Accept": "application/json, text/plain, */*",
})
if pdf_resp.ok:
with open(pdf_filename, "wb") as f:
f.write(pdf_resp.body())
json_filename = pdf_filename.replace(".pdf", ".json")
with open(json_filename, "w", encoding="utf-8") as f:
json.dump({
"pk": pk, "title": title, "label": label,
"event": row.get("event_event_id"),
"actual_date": actual_date,
"subject": subject, "study": study, "text": text,
}, f, ensure_ascii=False, indent=2)
existing_pks.add(pk)
new_count += 1
print(f" [{subject}] notifikace pk={pk} OK")
else:
print(f" [{subject}] notifikace pk={pk} CHYBA: {pdf_resp.status}")
page.wait_for_timeout(1000)
return new_count
def run(page, study):
out_dir = os.path.join(DETAILS_DIR, study)
os.makedirs(out_dir, exist_ok=True)
subjects = get_subjects(study)
print(f" Nalezeno {len(subjects)} subjektů")
today = datetime.date.today().strftime("%Y-%m-%d")
# api_base pro notifikace
jwt_init = page.evaluate("localStorage.getItem('JWT.access')")
instances = page.evaluate("""async (jwt) => {
const res = await fetch('/_/api/dispatch/app_instances/', {
headers: { 'Authorization': `Bearer ${jwt}` }
});
return res.json();
}""", jwt_init)
instance = next((i for i in instances if study in i.get("label", "")), None)
api_base = instance["api_base_url"] if instance else None
if not api_base:
print(f" UPOZORNĚNÍ: api_base nenalezena, notifikace se nebudou stahovat")
existing_pks = get_existing_pks(study)
print(f" V DB již {len(existing_pks)} notifikací pro {study}")
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
total_notif = 0
for subject in subjects:
filename = os.path.join(out_dir, f"{today} {study} {subject} Subject Detail.xlsx")
print(f" [{subject}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(subject)
page.wait_for_timeout(500)
# Zachytíme table_1 response při výběru subjektu
if api_base:
try:
with page.expect_response(
lambda r: "report_data" in r.url and "table_1" in r.url,
timeout=60000
) as resp_info:
page.locator("mat-option").first.dispatch_event("click")
table1_data = resp_info.value.json()
except Exception as e:
print(f" [{subject}] CHYBA zachycení table_1: {e}")
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
table1_data = None
else:
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
table1_data = None
page.wait_for_load_state("networkidle", timeout=120000)
page.wait_for_timeout(1000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{subject}] XLS OK")
# Stáhnout notifikace pro tohoto subjekta
if api_base and table1_data:
n = download_notifications_for_subject(
page, study, subject, api_base, existing_pks, out_dir, table1_data
)
total_notif += n
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(f" [{study}] Subject details hotovo. Nových notifikací: {total_notif}")
def main():
os.makedirs(DETAILS_DIR, exist_ok=True)
with sync_playwright() as p:
for study in STUDIES:
print(f"\n[{study}] Přihlášení...")
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
try:
run(page, study)
except Exception as e:
print(f" [{study}] CHYBA: {e}")
browser.close()
print("\nVše hotovo.")
if __name__ == "__main__":
main()