This commit is contained in:
2026-05-05 11:41:05 +02:00
parent afd9b3ef17
commit 1f52ce4045
204 changed files with 0 additions and 2312 deletions
@@ -1,110 +0,0 @@
-- IWRS tabulky pro databázi studie
-- Spustit jednou: mysql -h 192.168.1.76 -u root -p studie < create_iwrs_tables.sql
USE studie;
-- ── Import log ───────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS iwrs_import (
import_id INT AUTO_INCREMENT PRIMARY KEY,
study VARCHAR(20) NOT NULL,
imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
source_file VARCHAR(500) NOT NULL,
INDEX idx_study (study)
);
-- ── UCO3001 subject summary ───────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS iwrs_uco3001_subject_summary (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
subject VARCHAR(20) NOT NULL,
prior_subject_identifier VARCHAR(20),
site VARCHAR(50),
investigator VARCHAR(100),
location VARCHAR(50),
cohort_per_irt VARCHAR(100),
informed_consent_date DATE,
adolescent_assent_date DATE,
age SMALLINT,
weight DECIMAL(5,1),
rescreened_subject VARCHAR(10),
adt_ir VARCHAR(10),
three_or_more_advanced_therapies VARCHAR(10),
only_oral_5asa_compounds VARCHAR(10),
ustekinumab VARCHAR(10),
isolated_proctitis VARCHAR(10),
clinical_responder_status_i12_m0 VARCHAR(100),
irt_subject_status VARCHAR(50),
i0_rand_date_local DATE,
last_irt_transaction VARCHAR(100),
last_irt_transaction_date_local DATE,
last_irt_transaction_date_utc DATE,
next_irt_transaction VARCHAR(100),
next_irt_transaction_date_local DATE,
most_recent_med_assignment_date DATE,
days_since_last_med_assignment SMALLINT,
patient_forecast_status VARCHAR(50),
patient_forecast_status_changed_date DATE,
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_subject (subject)
);
-- ── MDD3003 subject summary ───────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS iwrs_mdd3003_subject_summary (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
subject VARCHAR(20) NOT NULL,
prior_subject_identifier VARCHAR(20),
site VARCHAR(50),
investigator VARCHAR(100),
location VARCHAR(50),
cohort_per_irt VARCHAR(50),
madrs_criteria_integrated VARCHAR(50),
informed_consent_date DATE,
age SMALLINT,
madrs_criteria_v15 VARCHAR(10),
madrs_criteria_v16 VARCHAR(10),
madrs_criteria_v17 VARCHAR(10),
stratification_country VARCHAR(10),
age_group VARCHAR(20),
stable_remitters VARCHAR(50),
irt_subject_status VARCHAR(100),
last_irt_transaction VARCHAR(100),
last_irt_transaction_date_local DATE,
last_irt_transaction_date_utc DATE,
next_irt_transaction VARCHAR(100),
next_irt_transaction_date_local DATE,
date_screened DATE,
date_screen_failed DATE,
date_randomized_part1 DATE,
date_early_withdraw_randomized_part1 DATE,
date_open_label_induction DATE,
date_early_withdraw_open_label_induction DATE,
date_randomized_part2 DATE,
date_early_withdraw_randomized_part2 DATE,
date_completed DATE,
date_unblinded DATE,
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_subject (subject)
);
-- ── Subject visits / transactions (obě studie) ───────────────────────────────
CREATE TABLE IF NOT EXISTS iwrs_subject_visits (
id INT AUTO_INCREMENT PRIMARY KEY,
import_id INT NOT NULL,
study VARCHAR(20) NOT NULL,
subject VARCHAR(20) NOT NULL,
visit_type ENUM('Past','Upcoming') NOT NULL,
scheduled_date DATE,
window_days VARCHAR(20),
actual_date DATE,
irt_transaction_no SMALLINT,
irt_transaction_description VARCHAR(200),
medication_assignment VARCHAR(200),
quantity_assigned SMALLINT,
medication_id VARCHAR(20),
FOREIGN KEY (import_id) REFERENCES iwrs_import(import_id),
INDEX idx_import (import_id),
INDEX idx_study_subject (study, subject)
);
@@ -1,310 +0,0 @@
import os
import glob
import datetime
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import (
Font, PatternFill, Alignment, Border, Side, GradientFill
)
from openpyxl.utils import get_column_letter
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
CREATED_DIR = os.path.join(BASE_DIR, "CreatedReports")
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
SOURCE_COLS = [
"Subject",
"Investigator",
"Subject's age collection",
"Cohort per IRT",
"IRT Subject Status",
"Last Recorded IRT Transaction",
"Next Expected IRT Transaction",
"Next Expected IRT Transaction Date [Local]",
]
DISPLAY_HEADERS = [
"Subject",
"Investigator",
"Věk",
"Cohort",
"Status",
"Last IRT",
"Next Visit",
"Next Date",
]
COL_WIDTHS = [14, 22, 6, 12, 14, 12, 12, 13]
# ── Styles ───────────────────────────────────────────────────────────────────
HEADER_FILL = PatternFill("solid", fgColor="1F4E79")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
NORMAL_FONT = Font(name="Arial", size=10)
BOLD_FONT = Font(name="Arial", bold=True, size=10)
STRIKE_FONT = Font(name="Arial", size=10, strike=True, color="999999")
ADOLESC_FONT = Font(name="Arial", bold=True, size=10)
THIN = Side(style="thin", color="CCCCCC")
BORDER = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
EVEN_FILL = PatternFill("solid", fgColor="EBF3FB")
ODD_FILL = PatternFill("solid", fgColor="FFFFFF")
CENTER = Alignment(horizontal="center", vertical="center", wrap_text=False)
LEFT = Alignment(horizontal="left", vertical="center", wrap_text=False)
def unique_path(directory, stem):
path = os.path.join(directory, f"{stem}.xlsx")
if not os.path.exists(path):
return path
time_tag = datetime.datetime.now().strftime("%H%M")
return os.path.join(directory, f"{stem} {time_tag}.xlsx")
def find_latest_source(study):
pattern = os.path.join(INCOMING_DIR, f"* {study} Subject Summary Report.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen zdrojový soubor pro {study} v {INCOMING_DIR}")
return files[0]
def load_source(path):
raw = pd.read_excel(path, header=None)
# find header row (row with "Subject" in first cell)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError("Hlavičkový řádek nenalezen")
df = pd.read_excel(path, header=header_row)
return df
def simplify_cohort(val):
if pd.isna(val):
return ""
val = str(val)
if "dolescent" in val:
return "Adolescent"
if val.startswith("Adult"):
return "Adult"
# MDD3003: "Part 1", "Part 2" — keep as-is
return val
def format_date(val):
if pd.isna(val):
return ""
if hasattr(val, "strftime"):
return val.strftime("%Y-%m-%d")
return str(val)[:10]
def write_zdroj(wb, df_raw, source_path):
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(source_path))
sheet_name = f"ZDROJ ({mtime.strftime('%d%b%Y').upper()})"
ws = wb.create_sheet(sheet_name)
ws.sheet_view.showGridLines = True
# write raw headers + data as plain table
headers = list(df_raw.columns)
for c, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=c, value=h)
cell.font = Font(name="Arial", bold=True, size=9, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="404040")
cell.alignment = LEFT
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = 20
for r, (_, row) in enumerate(df_raw.iterrows(), 2):
fill = EVEN_FILL if r % 2 == 0 else ODD_FILL
for c, col in enumerate(headers, 1):
val = row[col]
if pd.isna(val):
val = ""
elif hasattr(val, "strftime"):
val = val.strftime("%Y-%m-%d")
cell = ws.cell(row=r, column=c, value=val)
cell.font = Font(name="Arial", size=9)
cell.fill = fill
cell.border = BORDER
cell.alignment = LEFT
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
def write_prehled(wb, df_raw, study):
ws = wb.create_sheet("Přehled")
ws.sheet_view.showGridLines = False
ws.sheet_view.showRowColHeaders = True
# ── title row ────────────────────────────────────────────────────────────
ws.merge_cells("A1:H1")
title = ws["A1"]
title.value = f"Subject Summary — {study} ({datetime.date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
# ── header row ───────────────────────────────────────────────────────────
for c, (h, w) in enumerate(zip(DISPLAY_HEADERS, COL_WIDTHS), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
# ── build display dataframe ───────────────────────────────────────────────
display = pd.DataFrame()
display["Subject"] = df_raw["Subject"].fillna("")
display["Investigator"]= df_raw["Investigator"].fillna("")
display["Věk"] = df_raw["Subject's age collection"].apply(
lambda v: "" if pd.isna(v) else int(v))
display["Cohort"] = df_raw["Cohort per IRT"].apply(simplify_cohort)
display["Status"] = df_raw["IRT Subject Status"].fillna("")
display["Last IRT"] = df_raw["Last Recorded IRT Transaction"].fillna("")
display["Next Visit"] = df_raw["Next Expected IRT Transaction"].fillna("")
display["Next Date"] = df_raw["Next Expected IRT Transaction Date [Local]"].apply(format_date)
display = display.sort_values("Subject").reset_index(drop=True)
# ── data rows ────────────────────────────────────────────────────────────
for r_idx, row in display.iterrows():
excel_row = r_idx + 3 # row 1=title, row 2=header
status = str(row["Status"])
is_failed = "Screen Failed" in status or "Discontinued" in status
is_randomized = "Randomized" in status
is_adolescent = row["Cohort"] == "Adolescent"
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
values = [
row["Subject"], row["Investigator"], row["Věk"],
row["Cohort"], row["Status"], row["Last IRT"],
row["Next Visit"], row["Next Date"],
]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = BORDER
# alignment
cell.alignment = CENTER if c_idx in (3,) else LEFT
# font logic
if is_failed:
cell.font = STRIKE_FONT
elif c_idx == 5 and is_randomized:
cell.font = BOLD_FONT
elif c_idx == 4 and is_adolescent:
cell.font = ADOLESC_FONT
else:
cell.font = NORMAL_FONT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
last_data_row = len(display) + 2
ws.auto_filter.ref = f"A2:H{last_data_row}"
def write_next_visits(wb, df_raw, study):
ws = wb.create_sheet("Next Visits")
ws.sheet_view.showGridLines = False
# title
ws.merge_cells("A1:D1")
title = ws["A1"]
title.value = f"Next Expected Visits — {study} ({datetime.date.today().strftime('%d-%b-%Y')})"
title.font = Font(name="Arial", bold=True, size=12, color="1F4E79")
title.alignment = Alignment(horizontal="left", vertical="center")
ws.row_dimensions[1].height = 22
# headers
nv_headers = ["Subject", "Investigator", "Next Visit", "Datum"]
nv_widths = [14, 22, 26, 13]
for c, (h, w) in enumerate(zip(nv_headers, nv_widths), 1):
cell = ws.cell(row=2, column=c, value=h)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
cell.border = BORDER
ws.column_dimensions[get_column_letter(c)].width = w
ws.row_dimensions[2].height = 18
# data — only rows with a Next Date, exclude Screen Failed / Discontinued
df = pd.DataFrame()
df["Subject"] = df_raw["Subject"].fillna("")
df["Investigator"]= df_raw["Investigator"].fillna("")
df["Next Visit"] = df_raw["Next Expected IRT Transaction"].fillna("")
df["Datum"] = df_raw["Next Expected IRT Transaction Date [Local]"]
df["Status"] = df_raw["IRT Subject Status"].fillna("")
df = df[df["Datum"].notna()]
df = df[~df["Status"].str.contains("Screen Failed|Discontinued", na=False)]
df = df.sort_values("Datum").reset_index(drop=True)
for r_idx, row in df.iterrows():
excel_row = r_idx + 3
fill = EVEN_FILL if r_idx % 2 == 0 else ODD_FILL
datum_val = row["Datum"]
datum_str = datum_val.strftime("%Y-%m-%d") if hasattr(datum_val, "strftime") else str(datum_val)[:10]
values = [row["Subject"], row["Investigator"], row["Next Visit"], datum_str]
for c_idx, val in enumerate(values, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val if val != "" else None)
cell.fill = fill
cell.border = BORDER
cell.font = NORMAL_FONT
cell.alignment = LEFT
ws.row_dimensions[excel_row].height = 16
ws.freeze_panes = "A3"
last_data_row = len(df) + 2
ws.auto_filter.ref = f"A2:D{last_data_row}"
def create_report(study):
source_path = find_latest_source(study)
print(f"[{study}] Čtu: {os.path.basename(source_path)}")
df_raw = load_source(source_path)
wb = Workbook()
wb.remove(wb.active) # remove default sheet
write_prehled(wb, df_raw, study)
write_next_visits(wb, df_raw, study)
write_zdroj(wb, df_raw, source_path)
today = datetime.date.today().strftime("%Y-%m-%d")
out_path = unique_path(CREATED_DIR, f"{today} {study} Subject Summary")
wb.save(out_path)
print(f"[{study}] Uloženo: {out_path}")
return out_path
def main():
os.makedirs(CREATED_DIR, exist_ok=True)
for study in STUDIES:
try:
create_report(study)
except FileNotFoundError as e:
print(f"[{study}] PŘESKOČENO: {e}")
print("\nHotovo.")
main()
@@ -1,5 +0,0 @@
DB_HOST = "192.168.1.76"
DB_PORT = 3306
DB_USER = "root"
DB_PASSWORD = "Vlado9674+"
DB_NAME = "studie"
@@ -1,114 +0,0 @@
from playwright.sync_api import sync_playwright
import os
import glob
import datetime
import pandas as pd
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
# ────────────────────────────────────────────────────────────────────────────
def get_subjects(study):
pattern = os.path.join(INCOMING_DIR, f"* {study} Subject Summary Report.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
today = datetime.date.today().strftime("%Y-%m-%d")
if not os.path.basename(files[0]).startswith(today):
raise FileNotFoundError(f"Dnešní Subject Summary Report pro {study} neexistuje — spusť nejdříve download_subject_summary.py")
path = files[0]
print(f" Čtu subjekty z: {os.path.basename(path)}")
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError("Hlavičkový řádek nenalezen")
df = pd.read_excel(path, header=header_row)
subjects = df["Subject"].dropna().astype(str).str.strip().tolist()
return subjects
def run(page, study):
out_dir = os.path.join(DETAILS_DIR, study)
os.makedirs(out_dir, exist_ok=True)
subjects = get_subjects(study)
print(f" Nalezeno {len(subjects)} subjektů")
today = datetime.date.today().strftime("%Y-%m-%d")
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
for subject in subjects:
filename = os.path.join(out_dir, f"{today} {study} {subject} Subject Detail.xlsx")
print(f" [{subject}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(subject)
page.wait_for_timeout(500)
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{subject}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(f" [{study}] Subject details hotovo.")
def main():
os.makedirs(DETAILS_DIR, exist_ok=True)
with sync_playwright() as p:
for study in STUDIES:
print(f"\n[{study}] Přihlášení...")
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
try:
run(page, study)
except Exception as e:
print(f" [{study}] CHYBA: {e}")
browser.close()
print("\nVše hotovo.")
main()
@@ -1,76 +0,0 @@
from playwright.sync_api import sync_playwright
import os
import datetime
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
CREATED_DIR = os.path.join(BASE_DIR, "CreatedReports")
# ────────────────────────────────────────────────────────────────────────────
def unique_path(directory, stem):
path = os.path.join(directory, f"{stem}.xlsx")
if not os.path.exists(path):
return path
time_tag = datetime.datetime.now().strftime("%H%M")
return os.path.join(directory, f"{stem} {time_tag}.xlsx")
def download_study(page, study, today):
print(f"\n[{study}] Prihlaseni...")
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
print(f"[{study}] Vyber studie...")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
print(f"[{study}] Stahuji Subject Summary Report...")
page.goto(f"{BASE_URL}/report/patient_summary_report")
page.wait_for_load_state("networkidle", timeout=120000)
filename = unique_path(INCOMING_DIR, f"{today} {study} Subject Summary Report")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f"[{study}] OK -> {filename}")
return filename
def main():
today = datetime.date.today().strftime("%Y-%m-%d")
os.makedirs(INCOMING_DIR, exist_ok=True)
os.makedirs(CREATED_DIR, exist_ok=True)
downloaded = []
with sync_playwright() as p:
for study in STUDIES:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
filename = download_study(page, study, today)
downloaded.append((study, filename))
browser.close()
print("\nVse stazeno:")
for study, path in downloaded:
print(f" {study}: {path}")
main()
@@ -1,358 +0,0 @@
"""
Importuje data z IWRS Excel reportů do MySQL (databáze studie).
Pořadí spuštění:
1. download_subject_summary.py
2. download_subject_details.py
3. tento skript
Každé spuštění vytvoří nový import_id v iwrs_import.
Reportovací skripty pracují vždy s MAX(import_id) pro danou studii.
"""
import os
import glob
import datetime
import re
import pandas as pd
import mysql.connector
import db_config
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
# ── helpers ──────────────────────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST,
port=db_config.DB_PORT,
user=db_config.DB_USER,
password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def to_date(val):
"""Převede pandas Timestamp / string / NaT / NaN na date nebo None."""
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
def to_int(val):
try:
v = float(val)
return None if pd.isna(v) else int(v)
except (TypeError, ValueError):
return None
def to_float(val):
try:
v = float(val)
return None if pd.isna(v) else v
except (TypeError, ValueError):
return None
def to_str(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def find_summary_file(study):
today = datetime.date.today().strftime("%Y-%m-%d")
pattern = os.path.join(INCOMING_DIR, f"* {study} Subject Summary Report.xlsx")
files = sorted(
[f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")],
key=os.path.getmtime,
reverse=True,
)
if not files:
raise FileNotFoundError(f"Nenalezen Subject Summary Report pro {study}")
if not os.path.basename(files[0]).startswith(today):
print(f" UPOZORNĚNÍ: nejnovější Summary Report pro {study} není z dnešního dne ({os.path.basename(files[0])[:10]})")
return files[0]
def read_summary_df(path):
"""Přečte Summary xlsx, vrátí DataFrame od řádku s hlavičkou."""
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError(f"Hlavičkový řádek nenalezen v {path}")
return pd.read_excel(path, header=header_row)
def find_detail_files(study):
out_dir = os.path.join(DETAILS_DIR, study)
# Vezme soubory ze stejného dne jako nejnovější Summary Report
summary_path = find_summary_file(study)
file_date = os.path.basename(summary_path)[:10] # "YYYY-MM-DD"
pattern = os.path.join(out_dir, f"{file_date} {study} * Subject Detail.xlsx")
files = [f for f in glob.glob(pattern) if not os.path.basename(f).startswith("~$")]
return sorted(files)
def parse_detail_visits(path):
"""
Vrátí list slovníků s daty visitů z Detail xlsx.
Každý řádek tabulky (od řádku s hlavičkou Visit Type) je jedna transakce.
"""
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
header_row = None
for i, row in df.iterrows():
if "Visit Type" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
visits_df = df.iloc[header_row + 1:].copy()
visits_df.columns = range(visits_df.shape[1])
rows = []
for _, r in visits_df.iterrows():
visit_type = to_str(r.get(0))
if visit_type not in ("Past", "Upcoming"):
continue
rows.append({
"visit_type": visit_type,
"scheduled_date": to_date(r.get(1)),
"window_days": to_str(r.get(2)),
"actual_date": to_date(r.get(3)),
"irt_transaction_no": to_int(r.get(4)),
"irt_transaction_description": to_str(r.get(5)),
"medication_assignment": to_str(r.get(6)),
"quantity_assigned": to_int(r.get(7)),
"medication_id": to_str(r.get(8)),
})
return rows
# ── insert helpers ────────────────────────────────────────────────────────────
def insert_import(cursor, study, source_file):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file) VALUES (%s, %s, %s)",
(study, datetime.datetime.now(), os.path.basename(source_file)),
)
return cursor.lastrowid
def insert_uco3001_summary(cursor, import_id, df):
sql = """
INSERT INTO iwrs_uco3001_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, informed_consent_date, adolescent_assent_date, age, weight,
rescreened_subject, adt_ir, three_or_more_advanced_therapies,
only_oral_5asa_compounds, ustekinumab, isolated_proctitis,
clinical_responder_status_i12_m0, irt_subject_status,
i0_rand_date_local, last_irt_transaction,
last_irt_transaction_date_local, last_irt_transaction_date_utc,
next_irt_transaction, next_irt_transaction_date_local,
most_recent_med_assignment_date, days_since_last_med_assignment,
patient_forecast_status, patient_forecast_status_changed_date
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
"""
col = df.columns.tolist()
def c(name):
return col.index(name) if name in col else None
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_date(r["Informed Consent Date"]),
to_date(r["Adolescent Assent Date"]) if "Adolescent Assent Date" in col else None,
to_int(r["Subject's age collection"]),
to_float(r["Subject's weight collection"]) if "Subject's weight collection" in col else None,
to_str(r["Rescreened Subject"]) if "Rescreened Subject" in col else None,
to_str(r["ADT-IR"]) if "ADT-IR" in col else None,
to_str(r["3 or More Advanced Therapies"]) if "3 or More Advanced Therapies" in col else None,
to_str(r["Only Oral 5-ASA Compounds"]) if "Only Oral 5-ASA Compounds" in col else None,
to_str(r["Ustekinumab"]) if "Ustekinumab" in col else None,
to_str(r["Isolated Proctitis"]) if "Isolated Proctitis" in col else None,
to_str(r["Clinical Responder Status at I-12 / M-0"]) if "Clinical Responder Status at I-12 / M-0" in col else None,
to_str(r["IRT Subject Status"]),
to_date(r["I0_RAND_TIMESTAMP_LOCAL [Local]"]) if "I0_RAND_TIMESTAMP_LOCAL [Local]" in col else None,
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Most Recent Medication Assignment Transaction [Local]"]) if "Most Recent Medication Assignment Transaction [Local]" in col else None,
to_int(r["Days Since Last Medication Assignment Transaction"]) if "Days Since Last Medication Assignment Transaction" in col else None,
to_str(r["Patient Forecast Status"]) if "Patient Forecast Status" in col else None,
to_date(r["Patient Forecast Status Changed Date (UTC)"]) if "Patient Forecast Status Changed Date (UTC)" in col else None,
))
def insert_mdd3003_summary(cursor, import_id, df):
sql = """
INSERT INTO iwrs_mdd3003_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, madrs_criteria_integrated, informed_consent_date, age,
madrs_criteria_v15, madrs_criteria_v16, madrs_criteria_v17,
stratification_country, age_group, stable_remitters, irt_subject_status,
last_irt_transaction, last_irt_transaction_date_local,
last_irt_transaction_date_utc, next_irt_transaction,
next_irt_transaction_date_local, date_screened, date_screen_failed,
date_randomized_part1, date_early_withdraw_randomized_part1,
date_open_label_induction, date_early_withdraw_open_label_induction,
date_randomized_part2, date_early_withdraw_randomized_part2,
date_completed, date_unblinded
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_str(r["MADRS response criteria integrated or manually entered"]) if "MADRS response criteria integrated or manually entered" in col else None,
to_date(r["Informed Consent Date"]),
to_int(r["Subject's age collection"]),
to_str(r["MADRS response criteria v1.5 from RAVE"]) if "MADRS response criteria v1.5 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.6 from RAVE"]) if "MADRS response criteria v1.6 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.7 from RAVE"]) if "MADRS response criteria v1.7 from RAVE" in col else None,
to_str(r["Stratification Country"]) if "Stratification Country" in col else None,
to_str(r["Age Group"]) if "Age Group" in col else None,
to_str(r["Stable Remitters vs. Non Stable Remitters"]) if "Stable Remitters vs. Non Stable Remitters" in col else None,
to_str(r["IRT Subject Status"]),
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Date Screened [Local]"]) if "Date Screened [Local]" in col else None,
to_date(r["Date Screen Failed [Local]"]) if "Date Screen Failed [Local]" in col else None,
to_date(r["Date Randomized Part 1 [Local]"]) if "Date Randomized Part 1 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 1 [Local]"]) if "Date Early Withdraw Randomized Part 1 [Local]" in col else None,
to_date(r["Date Open Label Induction [Local]"]) if "Date Open Label Induction [Local]" in col else None,
to_date(r["Date Early Withdraw Open Label Induction [Local]"]) if "Date Early Withdraw Open Label Induction [Local]" in col else None,
to_date(r["Date Randomized Part 2 [Local]"]) if "Date Randomized Part 2 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 2 [Local]"]) if "Date Early Withdraw Randomized Part 2 [Local]" in col else None,
to_date(r["Date Completed [Local]"]) if "Date Completed [Local]" in col else None,
to_date(r["Date Unblinded [Local]"]) if "Date Unblinded [Local]" in col else None,
))
def insert_visits(cursor, import_id, study, subject, visits):
if not visits:
return
sql = """
INSERT INTO iwrs_subject_visits (
import_id, study, subject, visit_type, scheduled_date, window_days,
actual_date, irt_transaction_no, irt_transaction_description,
medication_assignment, quantity_assigned, medication_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
for v in visits:
cursor.execute(sql, (
import_id, study, subject,
v["visit_type"], v["scheduled_date"], v["window_days"],
v["actual_date"], v["irt_transaction_no"],
v["irt_transaction_description"], v["medication_assignment"],
v["quantity_assigned"], v["medication_id"],
))
# ── main ──────────────────────────────────────────────────────────────────────
def import_study(conn, study):
summary_path = find_summary_file(study)
print(f" Summary: {os.path.basename(summary_path)}")
df_summary = read_summary_df(summary_path)
df_summary = df_summary.dropna(how="all")
detail_files = find_detail_files(study)
print(f" Detail souborů: {len(detail_files)}")
cursor = conn.cursor()
import_id = insert_import(cursor, study, summary_path)
print(f" import_id = {import_id}")
if study == "77242113UCO3001":
insert_uco3001_summary(cursor, import_id, df_summary)
else:
insert_mdd3003_summary(cursor, import_id, df_summary)
print(f" Summary řádků: {len(df_summary)}")
visited = 0
for path in detail_files:
fname = os.path.basename(path)
# název: "2026-05-04 77242113UCO3001 CZ100012001 Subject Detail.xlsx"
m = re.search(r"\d{4}-\d{2}-\d{2} \S+ (\S+) Subject Detail\.xlsx", fname)
subject = m.group(1) if m else "UNKNOWN"
visits = parse_detail_visits(path)
insert_visits(cursor, import_id, study, subject, visits)
visited += len(visits)
conn.commit()
cursor.close()
print(f" Transakce uloženo: {visited}")
return import_id
def main():
conn = get_conn()
print("Připojeno k MySQL.\n")
for study in STUDIES:
print(f"[{study}]")
try:
import_id = import_study(conn, study)
print(f" OK — import_id {import_id}\n")
except Exception as e:
print(f" CHYBA: {e}\n")
conn.close()
print("Hotovo.")
main()
-422
View File
@@ -1,422 +0,0 @@
"""
Kompletní pipeline:
1. Stažení Subject Summary Reportů (obě studie)
2. Stažení Subject Detail Reportů (obě studie)
3. Import do MySQL
Spusť tento skript místo tří samostatných skriptů.
"""
import os
import datetime
import glob
import re
from playwright.sync_api import sync_playwright
import pandas as pd
import db_config
import mysql.connector
# ── CONFIG ───────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INCOMING_DIR = os.path.join(BASE_DIR, "IncomingSourceReports")
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
# ── helpers ───────────────────────────────────────────────────────────────────
def unique_path(directory, stem):
path = os.path.join(directory, f"{stem}.xlsx")
if not os.path.exists(path):
return path
time_tag = datetime.datetime.now().strftime("%H%M")
return os.path.join(directory, f"{stem} {time_tag}.xlsx")
def login(page, study):
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator("#login__submit").click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
# ── KROK 1: Subject Summary ───────────────────────────────────────────────────
def download_summary(page, study, today):
print(f" [{study}] Stahuji Subject Summary Report...")
page.goto(f"{BASE_URL}/report/patient_summary_report")
page.wait_for_load_state("networkidle", timeout=120000)
filename = unique_path(INCOMING_DIR, f"{today} {study} Subject Summary Report")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{study}] Summary OK -> {os.path.basename(filename)}")
return filename
# ── KROK 2: Subject Details ───────────────────────────────────────────────────
def get_subjects_from_summary(summary_path):
raw = pd.read_excel(summary_path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError("Hlavičkový řádek nenalezen")
df = pd.read_excel(summary_path, header=header_row)
return df["Subject"].dropna().astype(str).str.strip().tolist()
def download_details(page, study, summary_path, today):
out_dir = os.path.join(DETAILS_DIR, study)
os.makedirs(out_dir, exist_ok=True)
subjects = get_subjects_from_summary(summary_path)
print(f" [{study}] Subjektů k stažení: {len(subjects)}")
page.goto(f"{BASE_URL}/report/patient_detail_report")
page.wait_for_load_state("networkidle", timeout=120000)
for subject in subjects:
filename = os.path.join(out_dir, f"{today} {study} {subject} Subject Detail.xlsx")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(subject)
page.wait_for_timeout(500)
page.locator("mat-option").first.dispatch_event("click")
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{study}] Detail {subject} OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
# ── KROK 3: Import do MySQL ───────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST,
port=db_config.DB_PORT,
user=db_config.DB_USER,
password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def to_date(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%Y-%m-%d", "%d-%b-%Y", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
def to_int(val):
try:
v = float(val)
return None if pd.isna(v) else int(v)
except (TypeError, ValueError):
return None
def to_float(val):
try:
v = float(val)
return None if pd.isna(v) else v
except (TypeError, ValueError):
return None
def to_str(val):
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def read_summary_df(path):
raw = pd.read_excel(path, header=None)
header_row = None
for i, row in raw.iterrows():
if "Subject" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
raise ValueError(f"Hlavičkový řádek nenalezen v {path}")
return pd.read_excel(path, header=header_row).dropna(how="all")
def parse_detail_visits(path):
df = pd.read_excel(path, sheet_name="patient_detail_report", header=None)
header_row = None
for i, row in df.iterrows():
if "Visit Type" in [str(v).strip() for v in row]:
header_row = i
break
if header_row is None:
return []
visits_df = df.iloc[header_row + 1:].copy()
visits_df.columns = range(visits_df.shape[1])
rows = []
for _, r in visits_df.iterrows():
visit_type = to_str(r.get(0))
if visit_type not in ("Past", "Upcoming"):
continue
rows.append({
"visit_type": visit_type,
"scheduled_date": to_date(r.get(1)),
"window_days": to_str(r.get(2)),
"actual_date": to_date(r.get(3)),
"irt_transaction_no": to_int(r.get(4)),
"irt_transaction_description": to_str(r.get(5)),
"medication_assignment": to_str(r.get(6)),
"quantity_assigned": to_int(r.get(7)),
"medication_id": to_str(r.get(8)),
})
return rows
def insert_import(cursor, study, source_file):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file) VALUES (%s, %s, %s)",
(study, datetime.datetime.now(), os.path.basename(source_file)),
)
return cursor.lastrowid
def insert_uco3001_summary(cursor, import_id, df):
sql = """INSERT INTO iwrs_uco3001_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, informed_consent_date, adolescent_assent_date, age, weight,
rescreened_subject, adt_ir, three_or_more_advanced_therapies,
only_oral_5asa_compounds, ustekinumab, isolated_proctitis,
clinical_responder_status_i12_m0, irt_subject_status,
i0_rand_date_local, last_irt_transaction,
last_irt_transaction_date_local, last_irt_transaction_date_utc,
next_irt_transaction, next_irt_transaction_date_local,
most_recent_med_assignment_date, days_since_last_med_assignment,
patient_forecast_status, patient_forecast_status_changed_date
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_date(r["Informed Consent Date"]),
to_date(r["Adolescent Assent Date"]) if "Adolescent Assent Date" in col else None,
to_int(r["Subject's age collection"]),
to_float(r["Subject's weight collection"]) if "Subject's weight collection" in col else None,
to_str(r["Rescreened Subject"]) if "Rescreened Subject" in col else None,
to_str(r["ADT-IR"]) if "ADT-IR" in col else None,
to_str(r["3 or More Advanced Therapies"]) if "3 or More Advanced Therapies" in col else None,
to_str(r["Only Oral 5-ASA Compounds"]) if "Only Oral 5-ASA Compounds" in col else None,
to_str(r["Ustekinumab"]) if "Ustekinumab" in col else None,
to_str(r["Isolated Proctitis"]) if "Isolated Proctitis" in col else None,
to_str(r["Clinical Responder Status at I-12 / M-0"]) if "Clinical Responder Status at I-12 / M-0" in col else None,
to_str(r["IRT Subject Status"]),
to_date(r["I0_RAND_TIMESTAMP_LOCAL [Local]"]) if "I0_RAND_TIMESTAMP_LOCAL [Local]" in col else None,
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Most Recent Medication Assignment Transaction [Local]"]) if "Most Recent Medication Assignment Transaction [Local]" in col else None,
to_int(r["Days Since Last Medication Assignment Transaction"]) if "Days Since Last Medication Assignment Transaction" in col else None,
to_str(r["Patient Forecast Status"]) if "Patient Forecast Status" in col else None,
to_date(r["Patient Forecast Status Changed Date (UTC)"]) if "Patient Forecast Status Changed Date (UTC)" in col else None,
))
def insert_mdd3003_summary(cursor, import_id, df):
sql = """INSERT INTO iwrs_mdd3003_subject_summary (
import_id, subject, prior_subject_identifier, site, investigator, location,
cohort_per_irt, madrs_criteria_integrated, informed_consent_date, age,
madrs_criteria_v15, madrs_criteria_v16, madrs_criteria_v17,
stratification_country, age_group, stable_remitters, irt_subject_status,
last_irt_transaction, last_irt_transaction_date_local,
last_irt_transaction_date_utc, next_irt_transaction,
next_irt_transaction_date_local, date_screened, date_screen_failed,
date_randomized_part1, date_early_withdraw_randomized_part1,
date_open_label_induction, date_early_withdraw_open_label_induction,
date_randomized_part2, date_early_withdraw_randomized_part2,
date_completed, date_unblinded
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
col = df.columns.tolist()
for _, r in df.iterrows():
cursor.execute(sql, (
import_id,
to_str(r["Subject"]),
to_str(r["Prior Subject Identifier"]) if "Prior Subject Identifier" in col else None,
to_str(r["Site"]),
to_str(r["Investigator"]),
to_str(r["Location"]),
to_str(r["Cohort per IRT"]),
to_str(r["MADRS response criteria integrated or manually entered"]) if "MADRS response criteria integrated or manually entered" in col else None,
to_date(r["Informed Consent Date"]),
to_int(r["Subject's age collection"]),
to_str(r["MADRS response criteria v1.5 from RAVE"]) if "MADRS response criteria v1.5 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.6 from RAVE"]) if "MADRS response criteria v1.6 from RAVE" in col else None,
to_str(r["MADRS response criteria v1.7 from RAVE"]) if "MADRS response criteria v1.7 from RAVE" in col else None,
to_str(r["Stratification Country"]) if "Stratification Country" in col else None,
to_str(r["Age Group"]) if "Age Group" in col else None,
to_str(r["Stable Remitters vs. Non Stable Remitters"]) if "Stable Remitters vs. Non Stable Remitters" in col else None,
to_str(r["IRT Subject Status"]),
to_str(r["Last Recorded IRT Transaction"]),
to_date(r["Last Recorded IRT Transaction Date [Local]"]),
to_date(r["Last Recorded IRT Transaction Date (UTC)"]),
to_str(r["Next Expected IRT Transaction"]),
to_date(r["Next Expected IRT Transaction Date [Local]"]),
to_date(r["Date Screened [Local]"]) if "Date Screened [Local]" in col else None,
to_date(r["Date Screen Failed [Local]"]) if "Date Screen Failed [Local]" in col else None,
to_date(r["Date Randomized Part 1 [Local]"]) if "Date Randomized Part 1 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 1 [Local]"]) if "Date Early Withdraw Randomized Part 1 [Local]" in col else None,
to_date(r["Date Open Label Induction [Local]"]) if "Date Open Label Induction [Local]" in col else None,
to_date(r["Date Early Withdraw Open Label Induction [Local]"]) if "Date Early Withdraw Open Label Induction [Local]" in col else None,
to_date(r["Date Randomized Part 2 [Local]"]) if "Date Randomized Part 2 [Local]" in col else None,
to_date(r["Date Early Withdraw Randomized Part 2 [Local]"]) if "Date Early Withdraw Randomized Part 2 [Local]" in col else None,
to_date(r["Date Completed [Local]"]) if "Date Completed [Local]" in col else None,
to_date(r["Date Unblinded [Local]"]) if "Date Unblinded [Local]" in col else None,
))
def insert_visits(cursor, import_id, study, subject, visits):
if not visits:
return
sql = """INSERT INTO iwrs_subject_visits (
import_id, study, subject, visit_type, scheduled_date, window_days,
actual_date, irt_transaction_no, irt_transaction_description,
medication_assignment, quantity_assigned, medication_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for v in visits:
cursor.execute(sql, (
import_id, study, subject,
v["visit_type"], v["scheduled_date"], v["window_days"],
v["actual_date"], v["irt_transaction_no"],
v["irt_transaction_description"], v["medication_assignment"],
v["quantity_assigned"], v["medication_id"],
))
def import_to_mysql(summary_path, detail_files, study):
print(f"\n [MySQL] Importuji {study}...")
df_summary = read_summary_df(summary_path)
conn = get_conn()
cursor = conn.cursor()
import_id = insert_import(cursor, study, summary_path)
if study == "77242113UCO3001":
insert_uco3001_summary(cursor, import_id, df_summary)
else:
insert_mdd3003_summary(cursor, import_id, df_summary)
total_visits = 0
for path in detail_files:
fname = os.path.basename(path)
m = re.search(r"\d{4}-\d{2}-\d{2} \S+ (\S+) Subject Detail\.xlsx", fname)
subject = m.group(1) if m else "UNKNOWN"
visits = parse_detail_visits(path)
insert_visits(cursor, import_id, study, subject, visits)
total_visits += len(visits)
conn.commit()
cursor.close()
conn.close()
print(f" [MySQL] import_id={import_id} | pacientů={len(df_summary)} | transakcí={total_visits}")
return import_id
# ── MAIN ─────────────────────────────────────────────────────────────────────
def main():
today = datetime.date.today().strftime("%Y-%m-%d")
os.makedirs(INCOMING_DIR, exist_ok=True)
os.makedirs(DETAILS_DIR, exist_ok=True)
summary_paths = {}
# ── Krok 1 + 2: stahování (Playwright, každá studie zvlášť kvůli session) ──
with sync_playwright() as p:
for study in STUDIES:
print(f"\n{'='*60}")
print(f"[{study}] KROK 1: Subject Summary Report")
print(f"{'='*60}")
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
try:
login(page, study)
summary_path = download_summary(page, study, today)
summary_paths[study] = summary_path
print(f"\n[{study}] KROK 2: Subject Detail Reports")
download_details(page, study, summary_path, today)
except Exception as e:
print(f" [{study}] CHYBA při stahování: {e}")
summary_paths[study] = None
finally:
browser.close()
# ── Krok 3: import do MySQL ──────────────────────────────────────────────
print(f"\n{'='*60}")
print("KROK 3: Import do MySQL")
print(f"{'='*60}")
for study in STUDIES:
summary_path = summary_paths.get(study)
if not summary_path:
print(f" [{study}] PŘESKOČENO — stahování selhalo")
continue
detail_files = sorted(glob.glob(
os.path.join(DETAILS_DIR, study, f"{today} {study} * Subject Detail.xlsx")
))
try:
import_to_mysql(summary_path, detail_files, study)
except Exception as e:
print(f" [{study}] CHYBA při importu: {e}")
print(f"\n{'='*60}")
print("Vše hotovo.")
print(f"{'='*60}")
main()
-368
View File
@@ -1,368 +0,0 @@
import pandas as pd
from datetime import date
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
STUDY = "42847922MDD3003"
#STUDY = "77242113UCO3001"
INVENTORY_DIR = Path(f"xls_reports_{STUDY}")
DESTRUCTION_DIR = Path(f"xls_ip_destruction_{STUDY}")
SHIPMENTS_FILE = Path(f"xls_shipments_{STUDY}/shipments_report_{STUDY}.xlsx")
DETAILS_DIR = Path(f"xls_shipment_details_{STUDY}")
OUTPUT_DIR = Path("output")
OUTPUT_FILE = OUTPUT_DIR / f"{date.today().strftime('%Y-%m-%d')} {STUDY} CZ IWRS overview.xlsx"
SHIPMENT_DROP_COLS = {
"Location", "Shipped Date", "Delivered Date [UTC]",
"Delivery Recipient", "Delivery Details", "Cancelled Date",
"Tracking #", "Total Medication IDs",
"Shipping Category", "Study", "Destination Location", "Destination Site",
"Medication type", "Container ID", "Quantity of Medication IDs",
"Packaged Lot description",
}
# ── Shared constants ──────────────────────────────────────────────────────────
COLUMN_RENAMES = {
"Site": "Site",
"Medication ID": "Med ID",
"Packaged Lot number": "Lot No.",
"Original Expiration Date when Packaged Lot was Added": "Orig Exp Date",
"Expiration date": "Exp Date",
"Received Date": "Rcv Date",
"Shipment Receipt User": "Rcpt User",
"Subject Identifier": "Subject ID",
"Quantity Assigned": "Qty Asgn",
"IRT Transaction": "IRT Tx",
"Date Assigned": "Date Asgn",
"Assignment User": "Asgn User",
"Dispensation Status": "Disp Status",
"Dispensing Date": "Disp Date",
"Dispensing date": "Disp Date",
"Quantity Dispensed": "Qty Disp",
"Dispensing User": "Disp User",
"Quantity Returned": "Qty Ret",
"Date Returned": "Date Ret",
"Return User": "Ret User",
"DestroyedOn": "Destroyed",
"Basket number": "Basket No.",
}
DATE_COLUMNS = {
"Orig Exp Date", "Exp Date", "Rcv Date",
"Date Asgn", "Disp Date", "Date Ret", "Destroyed", "Max Visit Date",
}
COLUMN_WIDTHS = {
"Site": 14,
"Med ID": 10,
"Lot No.": 12,
"Orig Exp Date": 16,
"Exp Date": 14,
"Rcv Date": 14,
"Rcpt User": 22,
"Subject ID": 14,
"Qty Asgn": 9,
"IRT Tx": 8,
"Date Asgn": 14,
"Asgn User": 20,
"Disp Status": 16,
"Disp Date": 14,
"Qty Disp": 9,
"Disp User": 20,
"Qty Ret": 10,
"Date Ret": 14,
"Ret User": 18,
"Destroyed": 14,
"Basket No.": 12,
"Max Visit Date": 16,
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def read_inventory(path):
df = pd.read_excel(path, header=None)
# Support both "Medication ID" (MDD3003) and "Medication" (UCO3001)
mask = df[0].isin(["Medication ID", "Medication"])
meta = {}
for i in range(len(df)):
val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else ""
if val.startswith("Site:"):
meta["site"] = val.replace("Site:", "").strip()
if not mask.any():
print(f" {path.name}: no data (skipping)")
return None, meta
header_row = df[mask].index[0]
data = pd.read_excel(path, header=header_row)
data = data.rename(columns={"Medication": "Medication ID"})
return data, meta
def read_destruction_lookup():
lookup = {}
for path in DESTRUCTION_DIR.glob("*.xlsx"):
df = pd.read_excel(path, header=None)
basket_id = None
destroyed_on = None
for i in range(15):
val = str(df.iloc[i, 0]) if pd.notna(df.iloc[i, 0]) else ""
if val.startswith("Basket ID:"):
basket_id = val.replace("Basket ID:", "").strip()
if val.startswith("Drug Destruction Created Date:"):
destroyed_on = val.replace("Drug Destruction Created Date:", "").strip()
header_row = df[df[0] == "Medication ID Description"].index[0]
data = pd.read_excel(path, header=header_row)
for med_id in data["Medication ID"].dropna():
lookup[int(med_id)] = (basket_id, destroyed_on)
return lookup
def format_sheet(ws, header_color, highlight_col=None, highlight_color=None):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
header_fill = PatternFill("solid", start_color=header_color)
header_font = Font(bold=True, color="FFFFFF", name="Arial", size=10)
row_font = Font(name="Arial", size=10)
hi_fill = PatternFill("solid", start_color=highlight_color) if highlight_color else None
headers = [cell.value for cell in ws[1]]
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
cell.border = border
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
col_name = headers[cell.column - 1] if cell.column <= len(headers) else None
cell.font = row_font
cell.border = border
cell.alignment = Alignment(horizontal="center")
if col_name in DATE_COLUMNS:
cell.number_format = "DD-MMM-YYYY"
if hi_fill and col_name == highlight_col:
cell.fill = hi_fill
for cell in ws[1]:
width = COLUMN_WIDTHS.get(cell.value, 14)
ws.column_dimensions[get_column_letter(cell.column)].width = width
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Shipment helpers ─────────────────────────────────────────────────────────
def build_shipments():
sh = pd.read_excel(SHIPMENTS_FILE, sheet_name=0, header=5)
sh.columns = sh.columns.str.strip()
sh = sh.dropna(how="all")
sh["Shipment ID"] = sh["Shipment ID"].astype(str).str.strip()
sh = sh.drop(columns=[c for c in SHIPMENT_DROP_COLS if c in sh.columns])
shipment_cols = list(sh.columns)
all_rows = []
for _, s_row in sh.iterrows():
sid = s_row["Shipment ID"]
path = DETAILS_DIR / f"shipment_details_{sid}.xlsx"
if not path.exists():
continue
det = pd.read_excel(path, sheet_name=0, header=5)
det.columns = det.columns.str.strip()
det = det.dropna(how="all")
det["Shipment"] = det["Shipment"].astype(str).str.strip()
extra_cols = [c for c in det.columns if c not in shipment_cols and c != "Shipment" and c not in SHIPMENT_DROP_COLS]
for _, d_row in det.iterrows():
all_rows.append({**s_row.to_dict(), **{c: d_row[c] for c in extra_cols}})
result = pd.DataFrame(all_rows)
all_cols = shipment_cols + [c for c in extra_cols if c in result.columns]
result = result[all_cols]
for col in ["Request Date", "Received Date", "Expiration Date"]:
if col in result.columns:
result[col] = pd.to_datetime(result[col], errors="coerce")
print(f" Shipments: {result['Shipment ID'].nunique()} shipments, {len(result)} kitu")
return result
def build_site_summary(result):
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = result.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = pivot[STATUS_COLS].reset_index().rename(columns={
"Ship To:": "Site", "Returned by Subject": "Returned"
})
pivot = pivot.sort_values("Site").reset_index(drop=True)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
print(f" Site Summary: {len(pivot)} center")
return pivot
def format_shipment_sheet(ws, header_color_ship, header_color_detail, n_ship_cols):
thin = Side(style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
hfont = Font(bold=True, color="FFFFFF", name="Arial", size=10)
dfont = Font(name="Arial", size=10)
fill_ship = PatternFill("solid", start_color=header_color_ship)
fill_detail = PatternFill("solid", start_color=header_color_detail)
for cell in ws[1]:
cell.fill = fill_ship if cell.column <= n_ship_cols else fill_detail
cell.font = hfont
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = border
ws.column_dimensions[get_column_letter(cell.column)].width = min(len(str(cell.value or "")) + 4, 35)
ws.row_dimensions[1].height = 30
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
cell.font = dfont
cell.border = border
cell.alignment = Alignment(horizontal="center", vertical="center")
if cell.value.__class__.__name__ in ("datetime", "date", "Timestamp"):
cell.number_format = "DD-MMM-YYYY"
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# ── Build DataFrames ──────────────────────────────────────────────────────────
def build_main(lookup):
all_rows = []
for path in sorted(INVENTORY_DIR.glob("onsite_inventory_detail_*.xlsx")):
df, meta = read_inventory(path)
if df is None:
continue
df["DestroyedOn"] = df["Medication ID"].apply(
lambda x: lookup.get(int(x), (None, None))[1] if pd.notna(x) else None)
df["Basket number"] = df["Medication ID"].apply(
lambda x: lookup.get(int(x), (None, None))[0] if pd.notna(x) else None)
df.insert(0, "Site", meta.get("site", path.stem))
all_rows.append(df)
print(f" {path.name}: {len(df)} kits")
combined = pd.concat(all_rows, ignore_index=True)
combined.rename(columns=COLUMN_RENAMES, inplace=True)
for col in DATE_COLUMNS:
if col in combined.columns:
combined[col] = pd.to_datetime(combined[col], dayfirst=True, errors="coerce")
combined.sort_values(["Site", "Rcv Date", "Med ID"], inplace=True, ignore_index=True)
return combined
def build_expired(df):
today = date.today()
mask = (
df["Basket No."].isna() &
df["Subject ID"].isna() &
(df["Exp Date"] < pd.Timestamp(today))
)
filtered = df[mask].copy().reset_index(drop=True)
sheet_name = f"Expired as of {today.strftime('%d-%b-%Y')}"
print(f" Expired: {len(filtered)}")
return filtered, sheet_name
def build_assigned_not_dispensed(df):
mask = df["Subject ID"].notna() & df["Disp Date"].isna()
filtered = df[mask].copy().reset_index(drop=True)
print(f" Assigned not dispensed: {len(filtered)}")
return filtered
def build_not_returned(df):
no_ret = df[
df["Date Ret"].isna() &
df["Subject ID"].notna() &
(df["Disp Status"].str.upper() != "NOT DISPENSED")
].copy()
max_asgn = df.groupby("Subject ID")["Date Asgn"].max().rename("Max Visit Date")
no_ret = no_ret.join(max_asgn, on="Subject ID")
filtered = no_ret[no_ret["Date Asgn"] < no_ret["Max Visit Date"]].copy()
filtered = filtered.drop(columns=["Qty Ret", "Date Ret", "Ret User", "Destroyed", "Basket No."])
filtered = filtered.reset_index(drop=True)
print(f" Not returned: {len(filtered)}")
return filtered
def build_kits_for_destruction(df):
mask = (
df["Basket No."].isna() &
(df["Date Ret"].notna() | (df["Disp Status"].str.upper() == "NOT DISPENSED"))
)
filtered = df[mask].copy().sort_values(["Site", "Date Ret"], ascending=[True, True])
filtered = filtered.drop(columns=["Destroyed", "Basket No."]).reset_index(drop=True)
print(f" Kits for destruction: {len(filtered)}")
return filtered
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
# Prepare output dir, remove any previous overview file
OUTPUT_DIR.mkdir(exist_ok=True)
for old in OUTPUT_DIR.glob(f"*{STUDY} CZ IWRS overview.xlsx"):
old.unlink()
print(f"Removed old file: {old.name}")
lookup = read_destruction_lookup()
print(f"Loaded {len(lookup)} kits from destruction reports")
df = build_main(lookup)
expired_df, expired_sheet = build_expired(df)
assigned_df = build_assigned_not_dispensed(df)
not_returned_df = build_not_returned(df)
destruction_df = build_kits_for_destruction(df)
shipments_df = build_shipments()
site_summary_df = build_site_summary(shipments_df)
n_ship_cols = shipments_df.columns.tolist().index("Investigator") # first detail col index (0-based)
# Write all sheets
with pd.ExcelWriter(OUTPUT_FILE, engine="openpyxl") as writer:
df.to_excel( writer, index=False, sheet_name="CountryMedicationOverview")
expired_df.to_excel( writer, index=False, sheet_name=expired_sheet)
assigned_df.to_excel( writer, index=False, sheet_name="Assigned not dispensed")
not_returned_df.to_excel( writer, index=False, sheet_name="Not returned")
destruction_df.to_excel( writer, index=False, sheet_name="Kits for destruction")
shipments_df.to_excel( writer, index=False, sheet_name="Shipments")
site_summary_df.to_excel( writer, index=False, sheet_name="Site Summary")
# Format all sheets
wb = load_workbook(OUTPUT_FILE)
# Main sheet — dark blue, green highlight for Destroyed/Basket No.
ws_main = wb["CountryMedicationOverview"]
format_sheet(ws_main, header_color="1F4E79")
# Extra: green fill for Destroyed and Basket No. columns
new_col_fill = PatternFill("solid", start_color="E2EFDA")
headers_main = [c.value for c in ws_main[1]]
for row in ws_main.iter_rows(min_row=2, max_row=ws_main.max_row):
for cell in row:
col_name = headers_main[cell.column - 1] if cell.column <= len(headers_main) else None
if col_name in ("Destroyed", "Basket No."):
cell.fill = new_col_fill
format_sheet(wb[expired_sheet], header_color="C00000", highlight_col="Exp Date", highlight_color="FFE0E0")
format_sheet(wb["Assigned not dispensed"], header_color="833C00", highlight_col="Subject ID", highlight_color="FFF2CC")
format_sheet(wb["Not returned"], header_color="375623", highlight_col="Max Visit Date", highlight_color="E2EFDA")
format_sheet(wb["Kits for destruction"], header_color="595959")
format_shipment_sheet(wb["Shipments"], "1F4E79", "375623", n_ship_cols)
format_sheet(wb["Site Summary"], header_color="1F4E79")
wb.save(OUTPUT_FILE)
print(f"\nSaved: {OUTPUT_FILE} ({len(df)} rows on main sheet, {wb.sheetnames})")
if __name__ == "__main__":
main()
-163
View File
@@ -1,163 +0,0 @@
import pandas as pd
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from datetime import date
import os
STUDY = "77242113UCO3001"
SHIPMENTS_FILE = f"xls_shipments_{STUDY}/shipments_report_{STUDY}.xlsx"
DETAILS_DIR = f"xls_shipment_details_{STUDY}"
OUTPUT_DIR = "output"
TEST_SHIPMENT = None # None = vsechny shipments
DROP_COLS = {
"Location", "Shipped Date", "Delivered Date [UTC]",
"Delivery Recipient", "Delivery Details", "Cancelled Date",
"Tracking #", "Total Medication IDs",
"Shipping Category", "Study", "Destination Location", "Destination Site",
"Medication type", "Container ID", "Quantity of Medication IDs",
"Packaged Lot description",
}
os.makedirs(OUTPUT_DIR, exist_ok=True)
def read_shipments():
df = pd.read_excel(SHIPMENTS_FILE, sheet_name=0, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment ID"] = df["Shipment ID"].astype(str).str.strip()
df = df.drop(columns=[c for c in DROP_COLS if c in df.columns])
return df
def read_details(shipment_id):
path = os.path.join(DETAILS_DIR, f"shipment_details_{shipment_id}.xlsx")
if not os.path.exists(path):
return None
df = pd.read_excel(path, sheet_name=0, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment"] = df["Shipment"].astype(str).str.strip()
return df
def build_report():
shipments = read_shipments()
if TEST_SHIPMENT:
shipments = shipments[shipments["Shipment ID"] == TEST_SHIPMENT]
shipment_cols = list(shipments.columns)
all_rows = []
for _, s_row in shipments.iterrows():
sid = s_row["Shipment ID"]
details = read_details(sid)
if details is None:
continue
extra_cols = [c for c in details.columns if c not in shipment_cols and c != "Shipment" and c not in DROP_COLS]
for _, d_row in details.iterrows():
row = {**s_row.to_dict(), **{c: d_row[c] for c in extra_cols}}
all_rows.append(row)
print(f" [{sid}] {len(details)} kitu")
result = pd.DataFrame(all_rows)
all_cols = shipment_cols + [c for c in extra_cols if c in result.columns]
result = result[all_cols]
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Shipments"
HEADER_FILL_SHIP = PatternFill("solid", fgColor="1F4E79")
HEADER_FILL_DETAIL = PatternFill("solid", fgColor="375623")
HEADER_FONT = Font(name="Arial", bold=True, color="FFFFFF", size=10)
DATA_FONT = Font(name="Arial", size=10)
BORDER = Border(
left=Side(style="thin", color="BFBFBF"),
right=Side(style="thin", color="BFBFBF"),
bottom=Side(style="thin", color="BFBFBF"),
)
n_ship = len(shipment_cols)
for ci, col in enumerate(all_cols, 1):
cell = ws.cell(row=1, column=ci, value=col)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL_SHIP if ci <= n_ship else HEADER_FILL_DETAIL
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell.border = BORDER
ws.row_dimensions[1].height = 30
for ri, (_, row) in enumerate(result.iterrows(), 2):
for ci, col in enumerate(all_cols, 1):
val = row[col]
if pd.isna(val):
val = None
elif hasattr(val, "date"):
val = val.date()
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
if isinstance(val, date):
cell.number_format = "DD-MMM-YYYY"
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
for ci, col in enumerate(all_cols, 1):
vals = [col] + [str(result.iloc[r][col]) for r in range(len(result)) if pd.notna(result.iloc[r][col])]
ws.column_dimensions[get_column_letter(ci)].width = min(max((len(v) for v in vals), default=10) + 2, 35)
# --- Sheet 2: Site Summary ---
STATUS_COLS = ["Available", "Assigned", "Dispensed", "Returned by Subject"]
pivot = result.groupby("Ship To:")["Status"].value_counts().unstack(fill_value=0)
for s in STATUS_COLS:
if s not in pivot.columns:
pivot[s] = 0
pivot = pivot[STATUS_COLS].reset_index().rename(columns={"Ship To:": "Site", "Returned by Subject": "Returned"})
pivot = pivot.sort_values("Site").reset_index(drop=True)
pivot["Total"] = pivot[["Available", "Assigned", "Dispensed", "Returned"]].sum(axis=1)
ws2 = wb.create_sheet("Site Summary")
summary_cols = ["Site", "Available", "Assigned", "Dispensed", "Returned", "Total"]
HEADER_FILL_SUMM = PatternFill("solid", fgColor="1F4E79")
for ci, col in enumerate(summary_cols, 1):
cell = ws2.cell(row=1, column=ci, value=col)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL_SUMM
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = BORDER
ws2.row_dimensions[1].height = 25
for ri, (_, row) in enumerate(pivot.iterrows(), 2):
for ci, col in enumerate(summary_cols, 1):
cell = ws2.cell(row=ri, column=ci, value=row[col])
cell.font = DATA_FONT
cell.border = BORDER
cell.alignment = Alignment(horizontal="center", vertical="center")
for ci, col in enumerate(summary_cols, 1):
vals = [col] + [str(pivot.iloc[r][col]) for r in range(len(pivot))]
ws2.column_dimensions[get_column_letter(ci)].width = min(max(len(v) for v in vals) + 4, 35)
ws2.freeze_panes = "A2"
suffix = f"_{TEST_SHIPMENT}" if TEST_SHIPMENT else ""
pattern = f"{STUDY} CZ Shipments{suffix}.xlsx"
for old in os.listdir(OUTPUT_DIR):
if old.endswith(pattern):
try:
os.remove(os.path.join(OUTPUT_DIR, old))
print(f"Smazan -> {old}")
except OSError:
print(f"Preskakuji smazani (soubor otevren?) -> {old}")
outfile = os.path.join(OUTPUT_DIR, f"{date.today()} {STUDY} CZ Shipments{suffix}.xlsx")
wb.save(outfile)
print(f"\nUlozeno -> {outfile}")
build_report()
-76
View File
@@ -1,76 +0,0 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_ip_destruction_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_ip_destruction_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/ip_destruction_form")
page.wait_for_load_state("networkidle", timeout=120000)
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.wait_for_timeout(1000)
baskets = [b.strip() for b in page.locator('mat-option').all_inner_texts()
if b.strip() and b.strip() != "No results found"]
print(f" Nalezeno {len(baskets)} kosiku: {baskets}")
page.keyboard.press("Escape")
page.wait_for_timeout(500)
if not baskets:
print(" Zadne destruction kosite — preskakuji.")
return
for basket in baskets:
filename = os.path.join(output_dir, f"ip_destruction_basket_{basket}.xlsx")
if os.path.exists(filename):
print(f" [{basket}] Preskakuji — existuje.")
continue
print(f" [{basket}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(basket)
page.wait_for_timeout(500)
page.locator('mat-option').first.dispatch_event('click')
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{basket}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Destruction hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
-83
View File
@@ -1,83 +0,0 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
SITES = {
"42847922MDD3003": [
"S10-CZ10002",
"S10-CZ10004",
"S10-CZ10005",
"S10-CZ10008",
"S10-CZ10011",
"S10-CZ10012",
],
"77242113UCO3001": [
"DD5-CZ10001",
"DD5-CZ10003",
"DD5-CZ10006",
"DD5-CZ10009",
"DD5-CZ10010",
"DD5-CZ10012",
"DD5-CZ10013",
"DD5-CZ10015",
"DD5-CZ10016",
"DD5-CZ10020",
"DD5-CZ10021",
"DD5-CZ10022",
],
}
OUTPUT_DIR = f"xls_reports_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_reports_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/onsite_inventory_detail")
page.wait_for_load_state("networkidle", timeout=120000)
for site_id in SITES[study]:
print(f" [{site_id}] Stahuji...")
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.get_by_role("option", name=site_id).click()
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(os.path.join(output_dir, f"onsite_inventory_detail_{site_id}.xlsx"))
print(f" [{site_id}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Inventory hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
-95
View File
@@ -1,95 +0,0 @@
from playwright.sync_api import sync_playwright
import os
import pandas as pd
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDY = "42847922MDD3003"
#STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_shipment_details_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def get_cz_shipment_ids(study):
path = f"xls_shipments_{study}/shipments_report_{study}.xlsx"
if not os.path.exists(path):
return None
df = pd.read_excel(path, header=5)
df.columns = df.columns.str.strip()
df = df.dropna(how="all")
df["Shipment ID"] = df["Shipment ID"].astype(str).str.strip()
cz = df[df["Location"].str.contains("Czech", na=False, case=False)]
return cz["Shipment ID"].tolist()
def run(page, study):
output_dir = f"xls_shipment_details_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/shipment_details_report")
page.wait_for_load_state("networkidle", timeout=120000)
cz_ids = get_cz_shipment_ids(study)
if cz_ids is not None:
shipments = cz_ids
print(f" Filtrovano ze shipments reportu: {len(shipments)} CZ shipmentu")
else:
page.locator('input[placeholder="search"], input[type="text"]').first.click()
page.wait_for_timeout(1000)
shipments = [s.strip() for s in page.locator('mat-option').all_inner_texts()
if s.strip() and s.strip() != "No results found"]
print(f" Nalezeno {len(shipments)} shipmentu z dropdownu")
page.keyboard.press("Escape")
page.wait_for_timeout(500)
if not shipments:
print(" Zadne shipments — preskakuji.")
return
for shipment in shipments:
filename = os.path.join(output_dir, f"shipment_details_{shipment}.xlsx")
if os.path.exists(filename):
print(f" [{shipment}] Preskakuji — existuje.")
continue
print(f" [{shipment}] Stahuji...")
input_field = page.locator('input[placeholder="search"], input[type="text"]').first
input_field.click()
input_field.fill(shipment)
page.wait_for_timeout(500)
page.locator('mat-option').first.dispatch_event('click')
page.wait_for_load_state("networkidle", timeout=120000)
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" [{shipment}] OK")
page.get_by_role("button", name="Clear").click()
page.wait_for_load_state("networkidle", timeout=120000)
print(" Shipment details hotovo.")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
-47
View File
@@ -1,47 +0,0 @@
from playwright.sync_api import sync_playwright
import os
# ── CONFIG ──────────────────────────────────────────────────────────────────
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
# STUDY = "42847922MDD3003"
STUDY = "77242113UCO3001"
OUTPUT_DIR = f"xls_shipments_{STUDY}"
# ────────────────────────────────────────────────────────────────────────────
def run(page, study):
output_dir = f"xls_shipments_{study}"
os.makedirs(output_dir, exist_ok=True)
page.goto(f"{BASE_URL}/report/shipments_report")
page.wait_for_load_state("networkidle", timeout=120000)
filename = os.path.join(output_dir, f"shipments_report_{study}.xlsx")
with page.expect_download(timeout=120000) as dl:
page.get_by_role("button", name="Download XLS").click()
dl.value.save_as(filename)
print(f" Shipments report OK -> {filename}")
if __name__ == "__main__":
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=STUDY).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
run(page, STUDY)
browser.close()
-85
View File
@@ -1,85 +0,0 @@
import sys
import os
from playwright.sync_api import sync_playwright
import download_reports
import download_ip_destruction
import download_shipments_report
import download_shipment_details
import create_accountability_report
BASE_URL = "https://janssen.4gclinical.com"
EMAIL = "vbuzalka@its.jnj.com"
PASSWORD = "Vlado123++-+"
STUDIES = {
"1": "77242113UCO3001",
"2": "42847922MDD3003",
}
def pick_study():
print("Vyber studii:")
for k, v in STUDIES.items():
print(f" {k}) {v}")
while True:
choice = input("Volba (1/2): ").strip()
if choice in STUDIES:
return STUDIES[choice]
print(" Neplatna volba, zkus znovu.")
def login_and_select_study(page, study):
print(f"\n[1/5] Prihlaseni a vyber studie {study}...")
page.goto(BASE_URL)
page.wait_for_load_state("networkidle")
page.get_by_label("Email *").fill(EMAIL)
page.get_by_label("Password *").fill(PASSWORD)
page.locator('#login__submit').click()
page.wait_for_load_state("networkidle")
page.get_by_label("Study *").click()
page.get_by_role("option", name=study).click()
page.get_by_role("button", name="SELECT").click()
page.wait_for_load_state("networkidle")
print(" OK")
def main():
os.chdir(os.path.dirname(os.path.abspath(__file__)))
study = pick_study()
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(accept_downloads=True)
page = context.new_page()
login_and_select_study(page, study)
print(f"\n[2/5] Stahuji inventory reporty...")
download_reports.run(page, study)
print(f"\n[3/5] Stahuji IP destruction reporty...")
download_ip_destruction.run(page, study)
print(f"\n[4/5] Stahuji shipments report...")
download_shipments_report.run(page, study)
print(f"\n[5/5] Stahuji shipment details...")
download_shipment_details.run(page, study)
browser.close()
print(f"\n[6/6] Generuji accountability report...")
create_accountability_report.STUDY = study
create_accountability_report.INVENTORY_DIR = __import__("pathlib").Path(f"xls_reports_{study}")
create_accountability_report.DESTRUCTION_DIR= __import__("pathlib").Path(f"xls_ip_destruction_{study}")
create_accountability_report.SHIPMENTS_FILE = __import__("pathlib").Path(f"xls_shipments_{study}/shipments_report_{study}.xlsx")
create_accountability_report.DETAILS_DIR = __import__("pathlib").Path(f"xls_shipment_details_{study}")
create_accountability_report.OUTPUT_FILE = create_accountability_report.OUTPUT_DIR / f"{__import__('datetime').date.today().strftime('%Y-%m-%d')} {study} CZ IWRS overview.xlsx"
create_accountability_report.main()
print("\nVse hotovo!")
main()

Some files were not shown because too many files have changed in this diff Show More