315 lines
10 KiB
Python
315 lines
10 KiB
Python
"""
|
|
import_to_mongo.py
|
|
Verze: 1.3
|
|
Datum: 2026-06-15
|
|
|
|
Import Clario CSV do MongoDB (databáze: Clario).
|
|
|
|
Kolekce: Clario.MayoDiary / Clario.MayoScore / Clario.eCOA_DCRs / Clario.ECG_DCRs
|
|
Filtr: pouze řádky s Country == "Czech Republic"
|
|
Klíč: MayoDiary → Subject ID + Form Number
|
|
MayoScore → Participant ID + Visit
|
|
eCOA_DCRs → Data Correction ID
|
|
ECG_DCRs → Data Correction ID
|
|
Historie: při změně jakéhokoliv datového sloupce (fields + outcome cols) se stará
|
|
verze uloží do pole history[] spolu s outcome poli
|
|
Po importu přesune zpracované CSV do downloads/Zpracovano/
|
|
|
|
Použití:
|
|
python import_to_mongo.py # importuje všechny CSV z downloads/
|
|
python import_to_mongo.py downloads/konkretni.csv # jeden soubor
|
|
"""
|
|
|
|
import csv
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
DB_NAME = "Clario"
|
|
DOWNLOADS_DIR = Path(__file__).parent / "downloads"
|
|
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
|
|
|
|
COUNTRY_FILTER = "Czech Republic"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Konfigurace kolekcí
|
|
# ---------------------------------------------------------------------------
|
|
|
|
COLLECTION_CONFIG = {
|
|
"MayoDiary": {
|
|
"collection": "Clario.MayoDiary",
|
|
"subject_col": "Subject ID",
|
|
"key_cols": ("Subject ID", "Form Number"),
|
|
},
|
|
"MayoScore": {
|
|
"collection": "Clario.MayoScore",
|
|
"subject_col": "Participant ID",
|
|
"key_cols": ("Participant ID", "Visit"),
|
|
"outcome_cols": (
|
|
"Site Action",
|
|
"Last Mayo Score Submission",
|
|
"Week I-12 Clinical Responder",
|
|
"Week I-12 Clinical Remission",
|
|
"Clinical Flare",
|
|
"Loss of Response",
|
|
"Partial Mayo Response Post Loss of Response",
|
|
"Partial Mayo Response for Clinical Non-Responders",
|
|
),
|
|
},
|
|
"eCOA DCRs": {
|
|
"collection": "Clario.eCOA_DCRs",
|
|
"subject_col": "Subject ID",
|
|
"key_cols": ("Data Correction ID",),
|
|
},
|
|
"ECG DCRs": {
|
|
"collection": "Clario.ECG_DCRs",
|
|
"subject_col": "Subject Number",
|
|
"key_cols": ("Data Correction ID",),
|
|
},
|
|
}
|
|
|
|
DATE_FORMATS = [
|
|
"%d-%b-%Y ",
|
|
"%d-%b-%Y",
|
|
"%d-%b-%Y %H:%M:%S",
|
|
"%d %b %Y %H:%M:%S",
|
|
"%d %b %Y %H:%M:%S:%f",
|
|
"%d %b %Y",
|
|
"%d %B %Y",
|
|
"%Y%m%d %H:%M:%S.%f",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%m/%d/%Y %I:%M:%S %p",
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def clean_colname(name: str) -> str:
|
|
"""Odstraní BOM a okolní uvozovky/mezery z názvu sloupce."""
|
|
return name.lstrip("").strip().strip('"')
|
|
|
|
|
|
def parse_date(value: str) -> str | None:
|
|
v = value.strip()
|
|
for fmt in DATE_FORMATS:
|
|
try:
|
|
dt = datetime.strptime(v, fmt.strip())
|
|
return dt.replace(tzinfo=timezone.utc).isoformat()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def extract_snapshot_date(filename: str) -> str:
|
|
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
|
|
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
def detect_collection_type(filename: str) -> str | None:
|
|
"""Vrátí klíč do COLLECTION_CONFIG nebo None."""
|
|
stem = Path(filename).stem
|
|
for key in COLLECTION_CONFIG:
|
|
if key in stem:
|
|
return key
|
|
return None
|
|
|
|
|
|
def data_snapshot(doc: dict, outcome_cols: tuple) -> dict:
|
|
"""Porovnatelný snapshot všech datových polí: fields{} + outcome cols."""
|
|
snap = {"fields": doc.get("fields", {})}
|
|
for col in outcome_cols:
|
|
snap[col] = doc.get(col)
|
|
return snap
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CSV → dokument
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def map_row(row: dict, col_type: str) -> dict:
|
|
cfg = COLLECTION_CONFIG[col_type]
|
|
doc: dict = {}
|
|
fields: dict = {}
|
|
|
|
cleaned = {clean_colname(k): v.strip() if v else "" for k, v in row.items()}
|
|
|
|
subject_col = cfg["subject_col"]
|
|
doc["subject"] = {"id": cleaned.get(subject_col, "")}
|
|
# ECG DCRs používají "Site ID" místo "Site"
|
|
site_name = cleaned.get("Site") or cleaned.get("Site ID", "")
|
|
doc["site"] = {"name": site_name}
|
|
doc["country"] = cleaned.get("Country", "")
|
|
doc["study"] = cleaned.get("Protocol", "")
|
|
|
|
key_parts = [cleaned.get(c, "") for c in cfg["key_cols"]]
|
|
doc["recordKey"] = "_".join(key_parts)
|
|
|
|
outcome_cols = set(cfg.get("outcome_cols", ()))
|
|
for col in outcome_cols:
|
|
value = cleaned.get(col, "")
|
|
if value and value != "-":
|
|
parsed = parse_date(value)
|
|
doc[col] = parsed if parsed else value
|
|
else:
|
|
doc[col] = None
|
|
|
|
skip_top = {"Protocol", "Country", "Site", subject_col} | outcome_cols
|
|
for col, value in cleaned.items():
|
|
if col in skip_top:
|
|
continue
|
|
if not value or value == "-":
|
|
continue
|
|
parsed = parse_date(value)
|
|
fields[col] = parsed if parsed else value
|
|
|
|
doc["fields"] = fields
|
|
return doc
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import jednoho souboru
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def import_file(csv_path: str, db) -> dict:
|
|
filename = Path(csv_path).name
|
|
col_type = detect_collection_type(filename)
|
|
if col_type is None:
|
|
print(f" Preskakuji (neznamy typ): {filename}")
|
|
return {"skipped": True}
|
|
|
|
cfg = COLLECTION_CONFIG[col_type]
|
|
col_name = cfg["collection"]
|
|
outcome_cols = tuple(cfg.get("outcome_cols", ()))
|
|
snapshot_date = extract_snapshot_date(filename)
|
|
collection = db[col_name]
|
|
|
|
inserted = changed = unchanged = filtered_out = 0
|
|
|
|
with open(csv_path, encoding="utf-8-sig", newline="") as f:
|
|
reader = csv.DictReader(f, delimiter=",", quotechar='"')
|
|
|
|
for row in reader:
|
|
cleaned_row = {clean_colname(k): v for k, v in row.items()}
|
|
country = cleaned_row.get("Country", "").strip()
|
|
if COUNTRY_FILTER not in country:
|
|
filtered_out += 1
|
|
continue
|
|
|
|
doc = map_row(row, col_type)
|
|
record_key = doc.get("recordKey")
|
|
if not record_key:
|
|
continue
|
|
|
|
doc["sourceFile"] = filename
|
|
|
|
existing = collection.find_one({"recordKey": record_key})
|
|
|
|
if existing is None:
|
|
doc["firstSeen"] = snapshot_date
|
|
doc["lastSeen"] = snapshot_date
|
|
doc["history"] = []
|
|
collection.insert_one(doc)
|
|
inserted += 1
|
|
|
|
elif data_snapshot(existing, outcome_cols) != data_snapshot(doc, outcome_cols):
|
|
# Uložíme kompletní snapshot starého stavu (fields + outcome cols)
|
|
old_entry = {"date": existing.get("lastSeen", snapshot_date)}
|
|
for col in outcome_cols:
|
|
old_entry[col] = existing.get(col)
|
|
old_entry["fields"] = existing.get("fields", {})
|
|
|
|
update_doc = {k: v for k, v in doc.items()}
|
|
update_doc["lastSeen"] = snapshot_date
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{
|
|
"$push": {"history": old_entry},
|
|
"$set": update_doc,
|
|
},
|
|
)
|
|
changed += 1
|
|
|
|
else:
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{"$set": {"lastSeen": snapshot_date, "sourceFile": filename}},
|
|
)
|
|
unchanged += 1
|
|
|
|
collection.create_index([("recordKey", ASCENDING)], unique=True)
|
|
collection.create_index([("subject.id", ASCENDING)])
|
|
collection.create_index([("site.name", ASCENDING)])
|
|
if col_type == "MayoScore":
|
|
collection.create_index([("Site Action", ASCENDING)])
|
|
if col_type in ("eCOA DCRs", "ECG DCRs"):
|
|
collection.create_index([("fields.Status", ASCENDING)])
|
|
collection.create_index([("fields.Type", ASCENDING)])
|
|
|
|
stats = {
|
|
"collection": col_name,
|
|
"snapshot": snapshot_date,
|
|
"inserted": inserted,
|
|
"changed": changed,
|
|
"unchanged": unchanged,
|
|
"filtered_out": filtered_out,
|
|
}
|
|
print(f" {col_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
|
|
return stats
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
paths: list[Path] = []
|
|
|
|
if len(sys.argv) > 1:
|
|
for arg in sys.argv[1:]:
|
|
p = Path(arg)
|
|
if p.is_file():
|
|
paths.append(p)
|
|
else:
|
|
print(f"Soubor nenalezen: {arg}")
|
|
else:
|
|
paths = sorted(DOWNLOADS_DIR.glob("*.csv"))
|
|
|
|
if not paths:
|
|
print("Zadne CSV soubory k importu.")
|
|
return
|
|
|
|
print(f"Nalezeno {len(paths)} souboru.\n")
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
db = client[DB_NAME]
|
|
|
|
PROCESSED_DIR.mkdir(exist_ok=True)
|
|
|
|
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
|
|
|
for csv_path in paths:
|
|
print(f"Import: {csv_path.name}")
|
|
stats = import_file(str(csv_path), db)
|
|
if not stats.get("skipped"):
|
|
for k in total:
|
|
total[k] += stats.get(k, 0)
|
|
|
|
dest = PROCESSED_DIR / csv_path.name
|
|
shutil.move(str(csv_path), str(dest))
|
|
print(f" -> presunut do Zpracovano/")
|
|
|
|
client.close()
|
|
|
|
print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|