278 lines
11 KiB
Python
278 lines
11 KiB
Python
from playwright.sync_api import sync_playwright
|
|
import os
|
|
import glob
|
|
import datetime
|
|
import re
|
|
import json
|
|
import sys
|
|
import pandas as pd
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from common.mongo_writer import get_db
|
|
|
|
# ── CONFIG ──────────────────────────────────────────────────────────────────
|
|
BASE_URL = "https://janssen.4gclinical.com"
|
|
EMAIL = "vbuzalka@its.jnj.com"
|
|
PASSWORD = "Vlado123++-+"
|
|
|
|
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
|
|
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
|
|
# ────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_subjects(study, source_dir=None):
|
|
src = source_dir or INCOMING_DIR
|
|
pattern = os.path.join(src, f"* {study} Subject Summary Report*.xlsx")
|
|
files = sorted(
|
|
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
|
|
key=os.path.getmtime,
|
|
reverse=True,
|
|
)
|
|
if not files:
|
|
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
|
|
today = datetime.date.today().strftime("%Y-%m-%d")
|
|
if not os.path.basename(files[0]).startswith(today):
|
|
raise FileNotFoundError(f"Dnešní Subject Summary Report pro {study} neexistuje — spusť nejdříve download_subject_summary.py")
|
|
path = files[0]
|
|
print(f" Čtu subjekty z: {os.path.basename(path)}")
|
|
|
|
raw = pd.read_excel(path, header=None)
|
|
header_row = None
|
|
for i, row in raw.iterrows():
|
|
if "Subject" in [str(v).strip() for v in row]:
|
|
header_row = i
|
|
break
|
|
if header_row is None:
|
|
raise ValueError("Hlavičkový řádek nenalezen")
|
|
|
|
df = pd.read_excel(path, header=header_row)
|
|
return df["Subject"].dropna().astype(str).str.strip().tolist()
|
|
|
|
|
|
def strip_html(html):
|
|
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def get_existing_pks(study):
|
|
"""Načte už importované pk notifikací pro studii z Mongo."""
|
|
try:
|
|
db = get_db()
|
|
return {d["_id"] for d in db.iwrs_notifications.find(
|
|
{"study": study}, {"_id": 1}
|
|
)}
|
|
except Exception as e:
|
|
print(f" UPOZORNĚNÍ: nelze načíst pk z Mongo ({e}), stahuji vše")
|
|
return set()
|
|
|
|
|
|
def download_notifications_for_subject(page, study, subject, api_base, existing_pks, out_dir, table1_data, flat=False):
|
|
"""Stáhne notifikace pro subjekta z již zachycené table_1 response.
|
|
|
|
flat=True → název obsahuje study+subject (pro Incoming/ kde leží všechno pohromadě).
|
|
"""
|
|
new_count = 0
|
|
for row in table1_data.get("data", []):
|
|
for notif in (row.get("notification") or []):
|
|
item = notif.get("item", {})
|
|
pk = item.get("pk")
|
|
title = item.get("et_title")
|
|
if not pk or pk in existing_pks:
|
|
continue
|
|
|
|
label = (notif.get("label") or title or "").strip()
|
|
safe_label = re.sub(r'[\\/*?:"<>|]', "", label).replace(" ", "_")
|
|
body = item.get("body", "")
|
|
text = strip_html(body)
|
|
actual_date = row.get("actual_date_raw", "0000-00-00")
|
|
|
|
if flat:
|
|
stem = f"{actual_date}_{study}_{subject}_{safe_label}"
|
|
else:
|
|
stem = f"{actual_date}_{safe_label}"
|
|
pdf_filename = os.path.join(out_dir, f"{stem}.pdf")
|
|
if os.path.exists(pdf_filename):
|
|
pdf_filename = os.path.join(out_dir, f"{stem}_pk{pk}.pdf")
|
|
|
|
# Načti JWT čerstvě před každým requestem
|
|
jwt = page.evaluate("localStorage.getItem('JWT.access')")
|
|
pdf_url = f"{BASE_URL}{api_base}/api/v1/meta_api/pdfnotification?pk={pk}&title={title}&html=true"
|
|
pdf_resp = page.request.get(pdf_url, headers={
|
|
"Authorization": f"Bearer {jwt}",
|
|
"lang": "en",
|
|
"prancer_study": study,
|
|
"Accept": "application/json, text/plain, */*",
|
|
})
|
|
if pdf_resp.ok:
|
|
with open(pdf_filename, "wb") as f:
|
|
f.write(pdf_resp.body())
|
|
json_filename = pdf_filename.replace(".pdf", ".json")
|
|
with open(json_filename, "w", encoding="utf-8") as f:
|
|
json.dump({
|
|
"pk": pk, "title": title, "label": label,
|
|
"event": row.get("event_event_id"),
|
|
"actual_date": actual_date,
|
|
"subject": subject, "study": study, "text": text,
|
|
}, f, ensure_ascii=False, indent=2)
|
|
existing_pks.add(pk)
|
|
new_count += 1
|
|
print(f" [{subject}] notifikace pk={pk} OK")
|
|
else:
|
|
print(f" [{subject}] notifikace pk={pk} CHYBA: {pdf_resp.status}")
|
|
page.wait_for_timeout(1000)
|
|
|
|
return new_count
|
|
|
|
|
|
def run(page, study, out_dir=None, subjects_source_dir=None):
|
|
"""
|
|
out_dir=None → legacy: ukládá do IncomingSourceReportsDetails/{study}/
|
|
out_dir=cesta → vše ukládá ploše do té cesty (Incoming/).
|
|
subjects_source_dir=None → čte summary z IncomingSourceReports/ (legacy).
|
|
"""
|
|
flat = out_dir is not None
|
|
if out_dir is None:
|
|
out_dir = os.path.join(DETAILS_DIR, study)
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
subjects = get_subjects(study, subjects_source_dir)
|
|
print(f" Nalezeno {len(subjects)} subjektů")
|
|
today = datetime.date.today().strftime("%Y-%m-%d")
|
|
|
|
# api_base pro notifikace
|
|
jwt_init = page.evaluate("localStorage.getItem('JWT.access')")
|
|
instances = page.evaluate("""async (jwt) => {
|
|
const res = await fetch('/_/api/dispatch/app_instances/', {
|
|
headers: { 'Authorization': `Bearer ${jwt}` }
|
|
});
|
|
return res.json();
|
|
}""", jwt_init)
|
|
instance = next((i for i in instances if study in i.get("label", "")), None)
|
|
api_base = instance["api_base_url"] if instance else None
|
|
if not api_base:
|
|
print(f" UPOZORNĚNÍ: api_base nenalezena, notifikace se nebudou stahovat")
|
|
|
|
existing_pks = get_existing_pks(study)
|
|
print(f" V DB již {len(existing_pks)} notifikací pro {study}")
|
|
|
|
page.goto(f"{BASE_URL}/report/patient_detail_report")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
|
|
total_notif = 0
|
|
for subject in subjects:
|
|
filename = os.path.join(out_dir, f"{today} {study} {subject} Subject Detail.xlsx")
|
|
|
|
success = False
|
|
table1_data = None
|
|
for attempt in range(1, 4):
|
|
try:
|
|
print(f" [{subject}] Stahuji... (pokus {attempt}/3)")
|
|
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
|
|
input_field.click()
|
|
input_field.fill(subject)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Zachytíme table_1 response při výběru subjektu
|
|
if api_base:
|
|
try:
|
|
with page.expect_response(
|
|
lambda r: "report_data" in r.url and "table_1" in r.url,
|
|
timeout=60000
|
|
) as resp_info:
|
|
page.locator("mat-option").first.dispatch_event("click")
|
|
table1_data = resp_info.value.json()
|
|
except Exception as e:
|
|
print(f" [{subject}] CHYBA zachycení table_1: {e}")
|
|
page.locator("mat-option").first.dispatch_event("click")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
table1_data = None
|
|
else:
|
|
page.locator("mat-option").first.dispatch_event("click")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
table1_data = None
|
|
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
page.wait_for_timeout(2000)
|
|
|
|
with page.expect_download(timeout=60000) as dl:
|
|
page.get_by_role("button", name="Download XLS").click()
|
|
dl.value.save_as(filename)
|
|
print(f" [{subject}] XLS OK")
|
|
success = True
|
|
break
|
|
except Exception as e:
|
|
print(f" [{subject}] pokus {attempt} selhal: {e}")
|
|
if attempt < 3:
|
|
try:
|
|
page.goto(f"{BASE_URL}/report/patient_detail_report")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
except Exception as ge:
|
|
print(f" [{subject}] refresh selhal: {ge}")
|
|
|
|
if not success:
|
|
print(f" [{subject}] PŘESKAKUJI po 3 neúspěšných pokusech")
|
|
try:
|
|
page.goto(f"{BASE_URL}/report/patient_detail_report")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
# Stáhnout notifikace pro tohoto subjekta
|
|
if api_base and table1_data:
|
|
n = download_notifications_for_subject(
|
|
page, study, subject, api_base, existing_pks, out_dir, table1_data, flat=flat
|
|
)
|
|
total_notif += n
|
|
|
|
try:
|
|
page.get_by_role("button", name="Clear").click()
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
except Exception as e:
|
|
print(f" [{subject}] Clear selhal: {e} — refresh")
|
|
page.goto(f"{BASE_URL}/report/patient_detail_report")
|
|
page.wait_for_load_state("networkidle", timeout=120000)
|
|
|
|
print(f" [{study}] Subject details hotovo. Nových notifikací: {total_notif}")
|
|
|
|
|
|
def main():
|
|
os.makedirs(DETAILS_DIR, exist_ok=True)
|
|
|
|
with sync_playwright() as p:
|
|
for study in STUDIES:
|
|
print(f"\n[{study}] Přihlášení...")
|
|
browser = p.chromium.launch(headless=False)
|
|
context = browser.new_context(accept_downloads=True)
|
|
page = context.new_page()
|
|
|
|
page.goto(BASE_URL)
|
|
page.wait_for_load_state("networkidle")
|
|
page.get_by_label("Email *").fill(EMAIL)
|
|
page.get_by_label("Password *").fill(PASSWORD)
|
|
page.locator("#login__submit").click()
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
page.get_by_label("Study *").click()
|
|
page.get_by_role("option", name=study).click()
|
|
page.get_by_role("button", name="SELECT").click()
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
try:
|
|
run(page, study)
|
|
except Exception as e:
|
|
print(f" [{study}] CHYBA: {e}")
|
|
|
|
browser.close()
|
|
|
|
print("\nVše hotovo.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|