This commit is contained in:
2026-06-03 07:10:15 +02:00
parent 9ed9f97140
commit 681095d557
60 changed files with 215 additions and 2161 deletions
+24 -292
View File
@@ -2,23 +2,21 @@
Kompletní pipeline:
1. Stažení Subject Summary Reportů (obě studie)
2. Stažení Subject Detail Reportů + notifikací (obě studie)
3. Import do MySQL (summary, visits, notifikace)
3. Import do MongoDB (subject_summary + visits + notifications)
Spusť tento skript místo samostatných skriptů.
"""
import os
import sys
import datetime
import glob
import re
from playwright.sync_api import sync_playwright
import numpy as np
import pandas as pd
import db_config
import mysql.connector
import download_subject_details as dsd
import import_to_mongo
import import_notifications_to_mongo
# ── CONFIG ───────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
@@ -72,6 +70,7 @@ def download_summary(page, study, today):
# ── KROK 2: Subject Details ───────────────────────────────────────────────────
def get_subjects_from_summary(summary_path):
import pandas as pd
raw = pd.read_excel(summary_path, header=None)
header_row = None
for i, row in raw.iterrows():
@@ -112,277 +111,7 @@ def download_details(page, study, summary_path, today):
page.wait_for_load_state("networkidle", timeout=120000)
# ── KROK 3: Import do MySQL ───────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST,
port=db_config.DB_PORT,
user=db_config.DB_USER,
password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def _py(val):
"""Převede numpy skalár na Python nativní typ."""
if isinstance(val, np.generic):
return val.item()
return val
def to_date(val):
val = _py(val)
if val is None or (isinstance(val, float) and (val != val)):
return None
try:
if pd.isna(val):
return None
except (TypeError, ValueError):
pass
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
def to_int(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else int(v)
except (TypeError, ValueError):
return None
def to_float(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else float(v)
except (TypeError, ValueError):
return None
def to_str(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def read_summary_df(path):
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError(f"Hlavičkový řádek nenalezen v {path}")
return pd.read_excel(path, header=header_row).dropna(how="all")
def parse_detail_visits(path):
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
header_row = None
for i, row in df.iterrows():
if "Visit Type" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
visits_df = df.iloc[header_row + 1:].copy()
visits_df.columns = range(visits_df.shape[1])
rows = []
for _, r in visits_df.iterrows():
visit_type = to_str(r.get(0))
if visit_type not in ("Past", "Upcoming"):
continue
rows.append({
"visit_type": visit_type,
"scheduled_date": to_date(r.get(1)),
"window_days": to_str(r.get(2)),
"actual_date": to_date(r.get(3)),
"irt_transaction_no": to_int(r.get(4)),
"irt_transaction_description": to_str(r.get(5)),
"medication_assignment": to_str(r.get(6)),
"quantity_assigned": to_int(r.get(7)),
"medication_id": to_str(r.get(8)),
})
return rows
def insert_import(cursor, study, source_file):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file) VALUES (%s, %s, %s)",
(study, datetime.datetime.now(), os.path.basename(source_file)),
)
return cursor.lastrowid
def insert_uco3001_summary(cursor, import_id, df):
sql = """INSERT INTO iwrs_uco3001_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, informed_consent_date, adolescent_assent_date, age, weight,
rescreened_subject, adt_ir, three_or_more_advanced_therapies,
only_oral_5asa_compounds, ustekinumab, isolated_proctitis,
clinical_responder_status_i12_m0, irt_subject_status,
i0_rand_date_local, last_irt_transaction,
last_irt_transaction_date_local, last_irt_transaction_date_utc,
next_irt_transaction, next_irt_transaction_date_local,
most_recent_med_assignment_date, days_since_last_med_assignment,
patient_forecast_status, patient_forecast_status_changed_date
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_date(r["Informed Consent Date"]),
to_date(r["Adolescent Assent Date"]) if "Adolescent Assent Date" in col else None,
to_int(r["Subject's age collection"]),
to_float(r["Subject's weight collection"]) if "Subject's weight collection" in col else None,
to_str(r["Rescreened Subject"]) if "Rescreened Subject" in col else None,
to_str(r["ADT-IR"]) if "ADT-IR" in col else None,
to_str(r["3 or More Advanced Therapies"]) if "3 or More Advanced Therapies" in col else None,
to_str(r["Only Oral 5-ASA Compounds"]) if "Only Oral 5-ASA Compounds" in col else None,
to_str(r["Ustekinumab"]) if "Ustekinumab" in col else None,
to_str(r["Isolated Proctitis"]) if "Isolated Proctitis" in col else None,
to_str(r["Clinical Responder Status at I-12 / M-0"]) if "Clinical Responder Status at I-12 / M-0" in col else None,
to_str(r["IRT Subject Status"]),
to_date(r["I0_RAND_TIMESTAMP_LOCAL [Local]"]) if "I0_RAND_TIMESTAMP_LOCAL [Local]" in col else None,
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Most Recent Medication Assignment Transaction [Local]"]) if "Most Recent Medication Assignment Transaction [Local]" in col else None,
to_int(r["Days Since Last Medication Assignment Transaction"]) if "Days Since Last Medication Assignment Transaction" in col else None,
to_str(r["Patient Forecast Status"]) if "Patient Forecast Status" in col else None,
to_date(r["Patient Forecast Status Changed Date (UTC)"]) if "Patient Forecast Status Changed Date (UTC)" in col else None,
))
def insert_mdd3003_summary(cursor, import_id, df):
sql = """INSERT INTO iwrs_mdd3003_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, madrs_criteria_integrated, informed_consent_date, age,
madrs_criteria_v15, madrs_criteria_v16, madrs_criteria_v17,
stratification_country, age_group, stable_remitters, irt_subject_status,
last_irt_transaction, last_irt_transaction_date_local,
last_irt_transaction_date_utc, next_irt_transaction,
next_irt_transaction_date_local, date_screened, date_screen_failed,
date_randomized_part1, date_early_withdraw_randomized_part1,
date_open_label_induction, date_early_withdraw_open_label_induction,
date_randomized_part2, date_early_withdraw_randomized_part2,
date_completed, date_unblinded
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_str(r["MADRS response criteria integrated or manually entered"]) if "MADRS response criteria integrated or manually entered" in col else None,
to_date(r["Informed Consent Date"]),
to_int(r["Subject's age collection"]),
to_str(r["MADRS response criteria v1.5 from RAVE"]) if "MADRS response criteria v1.5 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.6 from RAVE"]) if "MADRS response criteria v1.6 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.7 from RAVE"]) if "MADRS response criteria v1.7 from RAVE" in col else None,
to_str(r["Stratification Country"]) if "Stratification Country" in col else None,
to_str(r["Age Group"]) if "Age Group" in col else None,
to_str(r["Stable Remitters vs. Non Stable Remitters"]) if "Stable Remitters vs. Non Stable Remitters" in col else None,
to_str(r["IRT Subject Status"]),
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Date Screened [Local]"]) if "Date Screened [Local]" in col else None,
to_date(r["Date Screen Failed [Local]"]) if "Date Screen Failed [Local]" in col else None,
to_date(r["Date Randomized Part 1 [Local]"]) if "Date Randomized Part 1 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 1 [Local]"]) if "Date Early Withdraw Randomized Part 1 [Local]" in col else None,
to_date(r["Date Open Label Induction [Local]"]) if "Date Open Label Induction [Local]" in col else None,
to_date(r["Date Early Withdraw Open Label Induction [Local]"]) if "Date Early Withdraw Open Label Induction [Local]" in col else None,
to_date(r["Date Randomized Part 2 [Local]"]) if "Date Randomized Part 2 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 2 [Local]"]) if "Date Early Withdraw Randomized Part 2 [Local]" in col else None,
to_date(r["Date Completed [Local]"]) if "Date Completed [Local]" in col else None,
to_date(r["Date Unblinded [Local]"]) if "Date Unblinded [Local]" in col else None,
))
def insert_visits(cursor, import_id, study, subject, visits):
if not visits:
return
sql = """INSERT INTO iwrs_subject_visits (
import_id, study, subject, visit_type, scheduled_date, window_days,
actual_date, irt_transaction_no, irt_transaction_description,
medication_assignment, quantity_assigned, medication_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for v in visits:
cursor.execute(sql, (
import_id, study, subject,
v["visit_type"], v["scheduled_date"], v["window_days"],
v["actual_date"], v["irt_transaction_no"],
v["irt_transaction_description"], v["medication_assignment"],
v["quantity_assigned"], v["medication_id"],
))
def import_to_mysql(summary_path, detail_files, study):
print(f"\n [MySQL] Importuji {study}...")
df_summary = read_summary_df(summary_path)
conn = get_conn()
cursor = conn.cursor()
import_id = insert_import(cursor, study, summary_path)
if study == "77242113UCO3001":
insert_uco3001_summary(cursor, import_id, df_summary)
else:
insert_mdd3003_summary(cursor, import_id, df_summary)
total_visits = 0
for path in detail_files:
fname = os.path.basename(path)
m = re.search(r"\d{4}-\d{2}-\d{2} \S+ (\S+) Subject Detail\.xlsx", fname)
subject = m.group(1) if m else "UNKNOWN"
visits = parse_detail_visits(path)
insert_visits(cursor, import_id, study, subject, visits)
total_visits += len(visits)
conn.commit()
cursor.close()
conn.close()
print(f" [MySQL] import_id={import_id} | pacientů={len(df_summary)} | transakcí={total_visits}")
return import_id
# ── MAIN ─────────────────────────────────────────────────────────────────────
# ── KROK 3: Import do MongoDB ────────────────────────────────────────────────
def main():
today = datetime.date.today().strftime("%Y-%m-%d")
@@ -391,12 +120,12 @@ def main():
summary_paths = {}
# ── Krok 1 + 2: stahování (Playwright, každá studie zvlášť kvůli session) ──
# Krok 1 + 2: stahování (Playwright, každá studie zvlášť kvůli session)
with sync_playwright() as p:
for study in STUDIES:
print(f"\n{'='*60}")
print("\n" + "=" * 60)
print(f"[{study}] KROK 1: Subject Summary Report")
print(f"{'='*60}")
print("=" * 60)
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
@@ -415,10 +144,10 @@ def main():
finally:
browser.close()
# ── Krok 3: import do MySQL ──────────────────────────────────────────────
print(f"\n{'='*60}")
print("KROK 3: Import do MySQL")
print(f"{'='*60}")
# Krok 3: import do MongoDB
print("\n" + "=" * 60)
print("KROK 3: Import do MongoDB")
print("=" * 60)
for study in STUDIES:
summary_path = summary_paths.get(study)
@@ -426,18 +155,21 @@ def main():
print(f" [{study}] PŘESKOČENO — stahování selhalo")
continue
detail_files = sorted(glob.glob(
os.path.join(DETAILS_DIR, study, f"{today} {study} * Subject Detail.xlsx")
))
try:
import_to_mysql(summary_path, detail_files, study)
import_to_mongo.run(study, summary_path, DETAILS_DIR, today)
except Exception as e:
print(f" [{study}] CHYBA při importu: {e}")
print(f" [{study}] CHYBA při importu summary/visits: {e}")
print(f"\n{'='*60}")
# Notifikace: PDF/JSON z disku rovnou do Mongo iwrs_notifications
print("\n [notifikace] import PDF/JSON do Mongo...")
try:
import_notifications_to_mongo.main(STUDIES)
except Exception as e:
print(f" CHYBA při importu notifikací: {e}")
print("\n" + "=" * 60)
print("Vše hotovo.")
print(f"{'='*60}")
print("=" * 60)
main()