Files
janssen/Clario/import_to_mongo.py
T
2026-06-15 16:10:47 +02:00

315 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
import_to_mongo.py
Verze: 1.3
Datum: 2026-06-15
Import Clario CSV do MongoDB (databáze: Clario).
Kolekce: Clario.MayoDiary / Clario.MayoScore / Clario.eCOA_DCRs / Clario.ECG_DCRs
Filtr: pouze řádky s Country == "Czech Republic"
Klíč: MayoDiary → Subject ID + Form Number
MayoScore → Participant ID + Visit
eCOA_DCRs → Data Correction ID
ECG_DCRs → Data Correction ID
Historie: při změně jakéhokoliv datového sloupce (fields + outcome cols) se stará
verze uloží do pole history[] spolu s outcome poli
Po importu přesune zpracované CSV do downloads/Zpracovano/
Použití:
python import_to_mongo.py # importuje všechny CSV z downloads/
python import_to_mongo.py downloads/konkretni.csv # jeden soubor
"""
import csv
import re
import shutil
import sys
from datetime import datetime, timezone
from pathlib import Path
from pymongo import MongoClient, ASCENDING
MONGO_URI = "mongodb://192.168.1.76:27017"
DB_NAME = "Clario"
DOWNLOADS_DIR = Path(__file__).parent / "downloads"
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
COUNTRY_FILTER = "Czech Republic"
# ---------------------------------------------------------------------------
# Konfigurace kolekcí
# ---------------------------------------------------------------------------
COLLECTION_CONFIG = {
"MayoDiary": {
"collection": "Clario.MayoDiary",
"subject_col": "Subject ID",
"key_cols": ("Subject ID", "Form Number"),
},
"MayoScore": {
"collection": "Clario.MayoScore",
"subject_col": "Participant ID",
"key_cols": ("Participant ID", "Visit"),
"outcome_cols": (
"Site Action",
"Last Mayo Score Submission",
"Week I-12 Clinical Responder",
"Week I-12 Clinical Remission",
"Clinical Flare",
"Loss of Response",
"Partial Mayo Response Post Loss of Response",
"Partial Mayo Response for Clinical Non-Responders",
),
},
"eCOA DCRs": {
"collection": "Clario.eCOA_DCRs",
"subject_col": "Subject ID",
"key_cols": ("Data Correction ID",),
},
"ECG DCRs": {
"collection": "Clario.ECG_DCRs",
"subject_col": "Subject Number",
"key_cols": ("Data Correction ID",),
},
}
DATE_FORMATS = [
"%d-%b-%Y ",
"%d-%b-%Y",
"%d-%b-%Y %H:%M:%S",
"%d %b %Y %H:%M:%S",
"%d %b %Y %H:%M:%S:%f",
"%d %b %Y",
"%d %B %Y",
"%Y%m%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
"%m/%d/%Y %I:%M:%S %p",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clean_colname(name: str) -> str:
"""Odstraní BOM a okolní uvozovky/mezery z názvu sloupce."""
return name.lstrip("").strip().strip('"')
def parse_date(value: str) -> str | None:
v = value.strip()
for fmt in DATE_FORMATS:
try:
dt = datetime.strptime(v, fmt.strip())
return dt.replace(tzinfo=timezone.utc).isoformat()
except ValueError:
continue
return None
def extract_snapshot_date(filename: str) -> str:
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
def detect_collection_type(filename: str) -> str | None:
"""Vrátí klíč do COLLECTION_CONFIG nebo None."""
stem = Path(filename).stem
for key in COLLECTION_CONFIG:
if key in stem:
return key
return None
def data_snapshot(doc: dict, outcome_cols: tuple) -> dict:
"""Porovnatelný snapshot všech datových polí: fields{} + outcome cols."""
snap = {"fields": doc.get("fields", {})}
for col in outcome_cols:
snap[col] = doc.get(col)
return snap
# ---------------------------------------------------------------------------
# CSV → dokument
# ---------------------------------------------------------------------------
def map_row(row: dict, col_type: str) -> dict:
cfg = COLLECTION_CONFIG[col_type]
doc: dict = {}
fields: dict = {}
cleaned = {clean_colname(k): v.strip() if v else "" for k, v in row.items()}
subject_col = cfg["subject_col"]
doc["subject"] = {"id": cleaned.get(subject_col, "")}
# ECG DCRs používají "Site ID" místo "Site"
site_name = cleaned.get("Site") or cleaned.get("Site ID", "")
doc["site"] = {"name": site_name}
doc["country"] = cleaned.get("Country", "")
doc["study"] = cleaned.get("Protocol", "")
key_parts = [cleaned.get(c, "") for c in cfg["key_cols"]]
doc["recordKey"] = "_".join(key_parts)
outcome_cols = set(cfg.get("outcome_cols", ()))
for col in outcome_cols:
value = cleaned.get(col, "")
if value and value != "-":
parsed = parse_date(value)
doc[col] = parsed if parsed else value
else:
doc[col] = None
skip_top = {"Protocol", "Country", "Site", subject_col} | outcome_cols
for col, value in cleaned.items():
if col in skip_top:
continue
if not value or value == "-":
continue
parsed = parse_date(value)
fields[col] = parsed if parsed else value
doc["fields"] = fields
return doc
# ---------------------------------------------------------------------------
# Import jednoho souboru
# ---------------------------------------------------------------------------
def import_file(csv_path: str, db) -> dict:
filename = Path(csv_path).name
col_type = detect_collection_type(filename)
if col_type is None:
print(f" Preskakuji (neznamy typ): {filename}")
return {"skipped": True}
cfg = COLLECTION_CONFIG[col_type]
col_name = cfg["collection"]
outcome_cols = tuple(cfg.get("outcome_cols", ()))
snapshot_date = extract_snapshot_date(filename)
collection = db[col_name]
inserted = changed = unchanged = filtered_out = 0
with open(csv_path, encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f, delimiter=",", quotechar='"')
for row in reader:
cleaned_row = {clean_colname(k): v for k, v in row.items()}
country = cleaned_row.get("Country", "").strip()
if COUNTRY_FILTER not in country:
filtered_out += 1
continue
doc = map_row(row, col_type)
record_key = doc.get("recordKey")
if not record_key:
continue
doc["sourceFile"] = filename
existing = collection.find_one({"recordKey": record_key})
if existing is None:
doc["firstSeen"] = snapshot_date
doc["lastSeen"] = snapshot_date
doc["history"] = []
collection.insert_one(doc)
inserted += 1
elif data_snapshot(existing, outcome_cols) != data_snapshot(doc, outcome_cols):
# Uložíme kompletní snapshot starého stavu (fields + outcome cols)
old_entry = {"date": existing.get("lastSeen", snapshot_date)}
for col in outcome_cols:
old_entry[col] = existing.get(col)
old_entry["fields"] = existing.get("fields", {})
update_doc = {k: v for k, v in doc.items()}
update_doc["lastSeen"] = snapshot_date
collection.update_one(
{"_id": existing["_id"]},
{
"$push": {"history": old_entry},
"$set": update_doc,
},
)
changed += 1
else:
collection.update_one(
{"_id": existing["_id"]},
{"$set": {"lastSeen": snapshot_date, "sourceFile": filename}},
)
unchanged += 1
collection.create_index([("recordKey", ASCENDING)], unique=True)
collection.create_index([("subject.id", ASCENDING)])
collection.create_index([("site.name", ASCENDING)])
if col_type == "MayoScore":
collection.create_index([("Site Action", ASCENDING)])
if col_type in ("eCOA DCRs", "ECG DCRs"):
collection.create_index([("fields.Status", ASCENDING)])
collection.create_index([("fields.Type", ASCENDING)])
stats = {
"collection": col_name,
"snapshot": snapshot_date,
"inserted": inserted,
"changed": changed,
"unchanged": unchanged,
"filtered_out": filtered_out,
}
print(f" {col_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
return stats
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
paths: list[Path] = []
if len(sys.argv) > 1:
for arg in sys.argv[1:]:
p = Path(arg)
if p.is_file():
paths.append(p)
else:
print(f"Soubor nenalezen: {arg}")
else:
paths = sorted(DOWNLOADS_DIR.glob("*.csv"))
if not paths:
print("Zadne CSV soubory k importu.")
return
print(f"Nalezeno {len(paths)} souboru.\n")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[DB_NAME]
PROCESSED_DIR.mkdir(exist_ok=True)
total = {"inserted": 0, "changed": 0, "unchanged": 0}
for csv_path in paths:
print(f"Import: {csv_path.name}")
stats = import_file(str(csv_path), db)
if not stats.get("skipped"):
for k in total:
total[k] += stats.get(k, 0)
dest = PROCESSED_DIR / csv_path.name
shutil.move(str(csv_path), str(dest))
print(f" -> presunut do Zpracovano/")
client.close()
print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
if __name__ == "__main__":
main()