This commit is contained in:
2026-05-05 10:40:13 +02:00
parent eaea634b2b
commit afd9b3ef17
4 changed files with 895 additions and 0 deletions
@@ -0,0 +1,358 @@
"""
Importuje data z IWRS Excel reportů do MySQL (databáze studie).
Pořadí spuštění:
1. download_subject_summary.py
2. download_subject_details.py
3. tento skript
Každé spuštění vytvoří nový import_id v iwrs_import.
Reportovací skripty pracují vždy s MAX(import_id) pro danou studii.
"""
import os
import glob
import datetime
import re
import pandas as pd
import mysql.connector
import db_config
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
# ── helpers ──────────────────────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST,
port=db_config.DB_PORT,
user=db_config.DB_USER,
password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def to_date(val):
"""Převede pandas Timestamp / string / NaT / NaN na date nebo None."""
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
def to_int(val):
try:
v = float(val)
return None if pd.isna(v) else int(v)
except (TypeError, ValueError):
return None
def to_float(val):
try:
v = float(val)
return None if pd.isna(v) else v
except (TypeError, ValueError):
return None
def to_str(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def find_summary_file(study):
today = datetime.date.today().strftime("%Y-%m-%d")
pattern = os.path.join(INCOMING_DIR, f"* {study} Subject Summary Report.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
if not os.path.basename(files[0]).startswith(today):
print(f" UPOZORNĚNÍ: nejnovější Summary Report pro {study} není z dnešního dne ({os.path.basename(files[0])[:10]})")
return files[0]
def read_summary_df(path):
"""Přečte Summary xlsx, vrátí DataFrame od řádku s hlavičkou."""
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError(f"Hlavičkový řádek nenalezen v {path}")
return pd.read_excel(path, header=header_row)
def find_detail_files(study):
out_dir = os.path.join(DETAILS_DIR, study)
# Vezme soubory ze stejného dne jako nejnovější Summary Report
summary_path = find_summary_file(study)
file_date = os.path.basename(summary_path)[:10] # "YYYY-MM-DD"
pattern = os.path.join(out_dir, f"{file_date} {study} * Subject Detail.xlsx")
files = [f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")]
return sorted(files)
def parse_detail_visits(path):
"""
Vrátí list slovníků s daty visitů z Detail xlsx.
Každý řádek tabulky (od řádku s hlavičkou Visit Type) je jedna transakce.
"""
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
header_row = None
for i, row in df.iterrows():
if "Visit Type" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
visits_df = df.iloc[header_row + 1:].copy()
visits_df.columns = range(visits_df.shape[1])
rows = []
for _, r in visits_df.iterrows():
visit_type = to_str(r.get(0))
if visit_type not in ("Past", "Upcoming"):
continue
rows.append({
"visit_type": visit_type,
"scheduled_date": to_date(r.get(1)),
"window_days": to_str(r.get(2)),
"actual_date": to_date(r.get(3)),
"irt_transaction_no": to_int(r.get(4)),
"irt_transaction_description": to_str(r.get(5)),
"medication_assignment": to_str(r.get(6)),
"quantity_assigned": to_int(r.get(7)),
"medication_id": to_str(r.get(8)),
})
return rows
# ── insert helpers ────────────────────────────────────────────────────────────
def insert_import(cursor, study, source_file):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file) VALUES (%s, %s, %s)",
(study, datetime.datetime.now(), os.path.basename(source_file)),
)
return cursor.lastrowid
def insert_uco3001_summary(cursor, import_id, df):
sql = """
INSERT INTO iwrs_uco3001_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, informed_consent_date, adolescent_assent_date, age, weight,
rescreened_subject, adt_ir, three_or_more_advanced_therapies,
only_oral_5asa_compounds, ustekinumab, isolated_proctitis,
clinical_responder_status_i12_m0, irt_subject_status,
i0_rand_date_local, last_irt_transaction,
last_irt_transaction_date_local, last_irt_transaction_date_utc,
next_irt_transaction, next_irt_transaction_date_local,
most_recent_med_assignment_date, days_since_last_med_assignment,
patient_forecast_status, patient_forecast_status_changed_date
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
"""
col = df.columns.tolist()
def c(name):
return col.index(name) if name in col else None
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_date(r["Informed Consent Date"]),
to_date(r["Adolescent Assent Date"]) if "Adolescent Assent Date" in col else None,
to_int(r["Subject's age collection"]),
to_float(r["Subject's weight collection"]) if "Subject's weight collection" in col else None,
to_str(r["Rescreened Subject"]) if "Rescreened Subject" in col else None,
to_str(r["ADT-IR"]) if "ADT-IR" in col else None,
to_str(r["3 or More Advanced Therapies"]) if "3 or More Advanced Therapies" in col else None,
to_str(r["Only Oral 5-ASA Compounds"]) if "Only Oral 5-ASA Compounds" in col else None,
to_str(r["Ustekinumab"]) if "Ustekinumab" in col else None,
to_str(r["Isolated Proctitis"]) if "Isolated Proctitis" in col else None,
to_str(r["Clinical Responder Status at I-12 / M-0"]) if "Clinical Responder Status at I-12 / M-0" in col else None,
to_str(r["IRT Subject Status"]),
to_date(r["I0_RAND_TIMESTAMP_LOCAL [Local]"]) if "I0_RAND_TIMESTAMP_LOCAL [Local]" in col else None,
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Most Recent Medication Assignment Transaction [Local]"]) if "Most Recent Medication Assignment Transaction [Local]" in col else None,
to_int(r["Days Since Last Medication Assignment Transaction"]) if "Days Since Last Medication Assignment Transaction" in col else None,
to_str(r["Patient Forecast Status"]) if "Patient Forecast Status" in col else None,
to_date(r["Patient Forecast Status Changed Date (UTC)"]) if "Patient Forecast Status Changed Date (UTC)" in col else None,
))
def insert_mdd3003_summary(cursor, import_id, df):
sql = """
INSERT INTO iwrs_mdd3003_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, madrs_criteria_integrated, informed_consent_date, age,
madrs_criteria_v15, madrs_criteria_v16, madrs_criteria_v17,
stratification_country, age_group, stable_remitters, irt_subject_status,
last_irt_transaction, last_irt_transaction_date_local,
last_irt_transaction_date_utc, next_irt_transaction,
next_irt_transaction_date_local, date_screened, date_screen_failed,
date_randomized_part1, date_early_withdraw_randomized_part1,
date_open_label_induction, date_early_withdraw_open_label_induction,
date_randomized_part2, date_early_withdraw_randomized_part2,
date_completed, date_unblinded
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_str(r["MADRS response criteria integrated or manually entered"]) if "MADRS response criteria integrated or manually entered" in col else None,
to_date(r["Informed Consent Date"]),
to_int(r["Subject's age collection"]),
to_str(r["MADRS response criteria v1.5 from RAVE"]) if "MADRS response criteria v1.5 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.6 from RAVE"]) if "MADRS response criteria v1.6 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.7 from RAVE"]) if "MADRS response criteria v1.7 from RAVE" in col else None,
to_str(r["Stratification Country"]) if "Stratification Country" in col else None,
to_str(r["Age Group"]) if "Age Group" in col else None,
to_str(r["Stable Remitters vs. Non Stable Remitters"]) if "Stable Remitters vs. Non Stable Remitters" in col else None,
to_str(r["IRT Subject Status"]),
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Date Screened [Local]"]) if "Date Screened [Local]" in col else None,
to_date(r["Date Screen Failed [Local]"]) if "Date Screen Failed [Local]" in col else None,
to_date(r["Date Randomized Part 1 [Local]"]) if "Date Randomized Part 1 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 1 [Local]"]) if "Date Early Withdraw Randomized Part 1 [Local]" in col else None,
to_date(r["Date Open Label Induction [Local]"]) if "Date Open Label Induction [Local]" in col else None,
to_date(r["Date Early Withdraw Open Label Induction [Local]"]) if "Date Early Withdraw Open Label Induction [Local]" in col else None,
to_date(r["Date Randomized Part 2 [Local]"]) if "Date Randomized Part 2 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 2 [Local]"]) if "Date Early Withdraw Randomized Part 2 [Local]" in col else None,
to_date(r["Date Completed [Local]"]) if "Date Completed [Local]" in col else None,
to_date(r["Date Unblinded [Local]"]) if "Date Unblinded [Local]" in col else None,
))
def insert_visits(cursor, import_id, study, subject, visits):
if not visits:
return
sql = """
INSERT INTO iwrs_subject_visits (
import_id, study, subject, visit_type, scheduled_date, window_days,
actual_date, irt_transaction_no, irt_transaction_description,
medication_assignment, quantity_assigned, medication_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
for v in visits:
cursor.execute(sql, (
import_id, study, subject,
v["visit_type"], v["scheduled_date"], v["window_days"],
v["actual_date"], v["irt_transaction_no"],
v["irt_transaction_description"], v["medication_assignment"],
v["quantity_assigned"], v["medication_id"],
))
# ── main ──────────────────────────────────────────────────────────────────────
def import_study(conn, study):
summary_path = find_summary_file(study)
print(f" Summary: {os.path.basename(summary_path)}")
df_summary = read_summary_df(summary_path)
df_summary = df_summary.dropna(how="all")
detail_files = find_detail_files(study)
print(f" Detail souborů: {len(detail_files)}")
cursor = conn.cursor()
import_id = insert_import(cursor, study, summary_path)
print(f" import_id = {import_id}")
if study == "77242113UCO3001":
insert_uco3001_summary(cursor, import_id, df_summary)
else:
insert_mdd3003_summary(cursor, import_id, df_summary)
print(f" Summary řádků: {len(df_summary)}")
visited = 0
for path in detail_files:
fname = os.path.basename(path)
# název: "2026-05-04 77242113UCO3001 CZ100012001 Subject Detail.xlsx"
m = re.search(r"\d{4}-\d{2}-\d{2} \S+ (\S+) Subject Detail\.xlsx", fname)
subject = m.group(1) if m else "UNKNOWN"
visits = parse_detail_visits(path)
insert_visits(cursor, import_id, study, subject, visits)
visited += len(visits)
conn.commit()
cursor.close()
print(f" Transakce uloženo: {visited}")
return import_id
def main():
conn = get_conn()
print("Připojeno k MySQL.\n")
for study in STUDIES:
print(f"[{study}]")
try:
import_id = import_study(conn, study)
print(f" OK — import_id {import_id}\n")
except Exception as e:
print(f" CHYBA: {e}\n")
conn.close()
print("Hotovo.")
main()