Files
janssen/Covance/import_to_mysql.py
T
2026-05-05 14:11:50 +02:00

180 lines
7.0 KiB
Python

"""
Import Covance "All Samples" CSV do MySQL tabulky covance_samples.
Strategie: versioning — každý import = nový import_id.
Aktuální data: WHERE import_id = (SELECT MAX(import_id) FROM iwrs_import
WHERE study = '...' AND report_type = 'covance_samples')
"""
import os
import glob
import datetime
import numpy as np
import pandas as pd
import mysql.connector
import db_config
SOURCE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "SourceData")
STUDY = "42847922MDD3003"
# ── type converters ──────────────────────────────────────────────────────────
def _py(val):
if isinstance(val, np.generic):
return val.item()
return val
def to_str(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
s = str(val).strip()
return None if s.lower() in ("nan", "nat", "none", "") else s
def to_int(val):
val = _py(val)
try:
v = float(val)
return None if (v != v) else int(v)
except (TypeError, ValueError):
return None
def to_date(val):
val = _py(val)
if val is None:
return None
if isinstance(val, float) and (val != val):
return None
try:
if pd.isna(val):
return None
except (TypeError, ValueError):
pass
if isinstance(val, pd.Timestamp):
return None if pd.isna(val) else val.date()
if isinstance(val, datetime.datetime):
return val.date()
if isinstance(val, datetime.date):
return val
s = str(val).strip()
if not s or s.lower() in ("nat", "nan", "none", ""):
return None
for fmt in ("%d-%b-%Y", "%Y-%m-%d", "%d-%m-%Y", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
pass
return None
# ── DB helpers ───────────────────────────────────────────────────────────────
def get_conn():
return mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
def insert_import(cursor, study, source_file):
cursor.execute(
"INSERT INTO iwrs_import (study, imported_at, source_file, report_type) VALUES (%s, %s, %s, %s)",
(study, datetime.datetime.now(), source_file, "covance_samples"),
)
return cursor.lastrowid
# ── parser ───────────────────────────────────────────────────────────────────
def find_csv(study):
pattern = os.path.join(SOURCE_DIR, f"Protocol {study} - All Samples.csv")
files = glob.glob(pattern)
if not files:
raise FileNotFoundError(f"Nenalezen CSV soubor: {pattern}")
return files[0]
def parse_csv(path):
df = pd.read_csv(path, dtype=str)
rows = []
for _, r in df.iterrows():
rows.append({
"study": to_str(r.get("Protocol Code")),
"investigator_no": to_str(r.get("Investigator No.")),
"investigator_name": to_str(r.get("Investigator Name")),
"patient_no": to_str(r.get("Patient No.")),
"collection_date": to_date(r.get("Collection Date")),
"protocol_visit_code": to_str(r.get("Protocol Visit Code")),
"kit_receipt_date": to_date(r.get("Kit Receipt Date")),
"container_receipt_date": to_date(r.get("Container Receipt Date")),
"accession": to_str(r.get("Accession")),
"container_no": to_int(r.get("Container No.")),
"container_barcode": to_str(r.get("Container Barcode No.")),
"specimen_type": to_str(r.get("Specimen Type")),
"sample_status": to_str(r.get("Sample Status")),
"expected_receipt_condition": to_str(r.get("Expected Receipt Condition")),
"actual_receipt_condition": to_str(r.get("Actual Receipt Condition")),
"label_line1": to_str(r.get("Container Label Line 1")),
"label_line2": to_str(r.get("Container Label Line 2")),
"sm_sample_status": to_str(r.get("SM Sample Status")),
"smart_class_description": to_str(r.get("SMART Specimen Class Description")),
"parent_barcode": to_str(r.get("Parent Barcode")),
"children_barcode": to_str(r.get("Children Barcode")),
})
return rows
# ── insert ───────────────────────────────────────────────────────────────────
def insert_samples(cursor, import_id, rows):
sql = """INSERT INTO covance_samples
(import_id, study, investigator_no, investigator_name, patient_no,
collection_date, protocol_visit_code, kit_receipt_date, container_receipt_date,
accession, container_no, container_barcode, specimen_type, sample_status,
expected_receipt_condition, actual_receipt_condition,
label_line1, label_line2, sm_sample_status, smart_class_description,
parent_barcode, children_barcode)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
for r in rows:
cursor.execute(sql, (
import_id,
r["study"], r["investigator_no"], r["investigator_name"], r["patient_no"],
r["collection_date"], r["protocol_visit_code"],
r["kit_receipt_date"], r["container_receipt_date"],
r["accession"], r["container_no"], r["container_barcode"],
r["specimen_type"], r["sample_status"],
r["expected_receipt_condition"], r["actual_receipt_condition"],
r["label_line1"], r["label_line2"],
r["sm_sample_status"], r["smart_class_description"],
r["parent_barcode"], r["children_barcode"],
))
# ── main ─────────────────────────────────────────────────────────────────────
def main():
csv_path = find_csv(STUDY)
print(f"Soubor: {os.path.basename(csv_path)}")
rows = parse_csv(csv_path)
print(f"Načteno řádků: {len(rows)}")
conn = get_conn()
cursor = conn.cursor()
import_id = insert_import(cursor, STUDY, os.path.basename(csv_path))
print(f"import_id = {import_id}")
insert_samples(cursor, import_id, rows)
conn.commit()
cursor.close()
conn.close()
print(f"Hotovo — {len(rows)} vzorků importováno.")
main()