Files

452 lines
16 KiB
Python

"""
Import EDC CSV reportů do MongoDB.
Použití:
python edc_import.py report.csv
python edc_import.py reports/*.csv
python edc_import.py report.csv --host mongodb://192.168.1.100:27017 --db klinicka_studie
"""
import argparse
import csv
import glob
import logging
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from pymongo import MongoClient, ASCENDING
from pymongo.errors import PyMongoError
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler("edc_import.log", encoding="utf-8"),
logging.StreamHandler(open(sys.stdout.fileno(), mode="w", encoding="utf-8", closefd=False)),
],
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mapování pevných CSV sloupců na MongoDB cesty
# ---------------------------------------------------------------------------
FIXED_FIELDS = {
"StudyName": "study",
"SiteGroupName": "site.group",
"SiteID": "site.id",
"SiteNumber": "site.number",
"Site": "site.name",
"SubjectID": "subject.id",
"Subject": "subject.label",
"CRFVersionID": "form.crfVersionId",
"InstanceID": "form.instanceId",
"InstanceName": "form.instanceName",
"FolderSeq": "form.folderSeq",
"Page": "form.page",
"RecordID": "form.recordId",
"RecordPosition": "form.recordPosition",
"LastModifiedDate": "lastModified",
"PrintDateTime": "importedAt",
}
# Sloupce, které jdou do _meta (ostatní administrativní)
META_FIELDS = {"RunUser", "VersionNumber", "FilterField"}
# Pole, která se převedou na int
INT_FIELDS = {"Elapsed days"}
# Formáty datumů, které zkusíme parsovat
DATE_FORMATS = [
"%d %b %Y %H:%M:%S", # 20 MAY 2026 12:06:18
"%d %b %Y %H:%M:%S:%f", # 10 Aug 2025 18:13:22:080 (EDC query dates)
"%Y%m%d %H:%M:%S.%f", # 20250810 18:13:22.080 (sortable query dates)
"%Y-%m-%d %H:%M:%S", # 2026-05-20 12:06:28
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%d/%m/%Y %H:%M:%S",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S %p", # 5/20/2026 1:23:27 PM
]
# ---------------------------------------------------------------------------
# QueryDetails — detekce a mapování
# ---------------------------------------------------------------------------
QUERY_DETAIL_MARKER = "QueryID(ReQry)"
QUERY_META_FIELDS = {
"StudyParameter", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter",
"SubjectParameter", "SubjectStatusParameter", "FolderParameter", "FormParameter",
"FieldParameter", "MarkingGroupParameter", "QueryStatusParameter",
"IncludeInactivePagesParameter", "PageSDVParameter", "PageFrozenParameter",
"PageLockedParameter", "StartDateParameter", "EndDateParameter",
"MilestoneParameter", "ReportTypeParameter", "VersionNumber", "TimeZone",
"RunUser", "ErrorString",
# Sortable dates — redundantní, parsujeme z hlavních sloupců
"OpenedDateSrtble", "AnsweredDateSrtble", "ClosedDateSrtble",
# Agregátní počty — jdou do meta
"VisitSiteLevel", "VisitCountryLevel", "VisitStudyLevel",
"PageSubjectLevel", "PageSiteLevel", "PageCountryLevel", "PageStudyLevel",
"Queries (Op/Ans/SDV)",
}
def is_query_details(fieldnames: list[str]) -> bool:
return QUERY_DETAIL_MARKER in fieldnames
def map_query_row(row: dict, source_file: str) -> dict:
"""Přemapuje řádek QueryDetails reportu na MongoDB dokument."""
def val(col: str) -> str:
return (row.get(col) or "").strip()
def int_or_none(col: str):
v = val(col)
if v == "":
return None
try:
return int(v)
except ValueError:
return v
def date_or_str(col: str):
v = val(col)
if not v:
return None
parsed = parse_date(v)
return parsed if parsed else v
meta = {k: row[k].strip() for k in QUERY_META_FIELDS if row.get(k, "").strip()}
doc = {
"study": val("StudyParameter"),
"site": {
"group": val("Country/Region"),
"number": val("Site Number"),
"name": val("Sites"),
},
"subject": {
"label": val("Subjects"),
"status": val("Subject Status"),
},
"visit": val("Visits"),
"page": val("Pages"),
"recordPosition": int_or_none("RecordPosition"),
"field": val("Field"),
"queryGroup": val("Query Group"),
"queryId": val(QUERY_DETAIL_MARKER),
"queryStatus": val("QueryStatus"),
"openedBy": val("Opened By"),
"openedDate": date_or_str("Opened Date"),
"answeredBy": val("Answered By") or None,
"answeredDate": date_or_str("Answered Date"),
"closedBy": val("Closed By") or None,
"closedDate": date_or_str("Closed Date"),
"daysNotYetClosed": int_or_none("DaysNotYetClosed"),
"daysToAnswer": int_or_none("Days to Answer"),
"daysToClose": int_or_none("Days to Close"),
"queryText": val("QueryText"),
"answerText": val("Answer Text (if any)") or None,
"importedAt": date_or_str("PrintDateTime"),
"sourceFile": source_file,
"_meta": meta,
}
# Odstraň None hodnoty z top-level (ne z nested)
return {k: v for k, v in doc.items() if v is not None or k in ("queryId",)}
def ensure_query_indexes(collection) -> None:
collection.create_index([("queryId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.label", ASCENDING)])
collection.create_index([("site.number", ASCENDING)])
collection.create_index([("queryStatus", ASCENDING)])
collection.create_index([("openedDate", ASCENDING)])
def ensure_snapshot_indexes(collection) -> None:
"""Indexy pro queries_snapshots — unikátní kombinace queryId + snapshotDate."""
collection.create_index(
[("queryId", ASCENDING), ("snapshotDate", ASCENDING)],
unique=True,
)
collection.create_index([("snapshotDate", ASCENDING)])
collection.create_index([("queryStatus", ASCENDING)])
collection.create_index([("site.number", ASCENDING)])
collection.create_index([("subject.label", ASCENDING)])
def extract_snapshot_date(filename: str) -> str:
"""
Vytáhne datum ze jména souboru.
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv''2026-05-20'
Fallback: dnešní datum.
"""
stem = Path(filename).name
match = re.match(r"(\d{4}-\d{2}-\d{2})", stem)
if match:
return match.group(1)
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def parse_date(value: str) -> str | None:
"""Pokusí se převést string na ISO 8601; jinak vrátí None."""
value = value.strip()
for fmt in DATE_FORMATS:
try:
dt = datetime.strptime(value, fmt)
return dt.replace(tzinfo=timezone.utc).isoformat()
except ValueError:
continue
return None
def set_nested(doc: dict, path: str, value: str) -> None:
"""Nastaví hodnotu v nested dict podle tečkové cesty, např. 'site.id'."""
parts = path.split(".")
for part in parts[:-1]:
doc = doc.setdefault(part, {})
doc[parts[-1]] = value
def collection_name_from_filename(filename: str) -> str:
"""
Odvodí název kolekce z názvu souboru.
'2026-05-20_15-09_EDC_MDD3003_InterimInvestigatorSignature_DataListing.csv''MDD3003_InterimInvestigatorSignature'
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv''MDD3003_QueryDetails'
"""
stem = Path(filename).stem
# Se suffixem _DataListing
match = re.search(r"EDC_(.+?)_DataListing", stem, re.IGNORECASE)
if match:
return match.group(1)
# Bez suffixu _DataListing (např. QueryDetails)
match = re.search(r"EDC_(.+)$", stem, re.IGNORECASE)
if match:
return match.group(1)
return stem
def map_row(row: dict, source_file: str) -> dict:
"""Přemapuje jeden CSV řádek na MongoDB dokument."""
doc: dict = {}
meta: dict = {}
fields: dict = {}
# Zjisti všechny klíče pro FieldNValue/FieldNLabel
field_keys = set(row.keys())
for col, value in row.items():
value = value.strip() if value else ""
# Pevná pole
if col in FIXED_FIELDS:
path = FIXED_FIELDS[col]
if path == "form.folderSeq":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path == "form.recordPosition":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path in ("lastModified", "importedAt"):
parsed = parse_date(value)
value = parsed if parsed else value
set_nested(doc, path, value)
continue
# Meta pole
if col in META_FIELDS:
if value:
meta[col] = value
continue
# FieldNLabel / FieldNValue jsou zpracovány níže
if re.match(r"^Field\d+(Value|Label)$", col):
continue
# Zbývající neznámé pevné sloupce také do meta
if not re.match(r"^Field\d+", col):
if value:
meta[col] = value
# Zpracuj páry Field1Value/Field1Label ... Field300Value/Field300Label
n = 1
while True:
val_key = f"Field{n}Value"
lbl_key = f"Field{n}Label"
if val_key not in field_keys and lbl_key not in field_keys:
break
label = (row.get(lbl_key) or "").strip()
value = (row.get(val_key) or "").strip()
if label and value:
# Pokus o převod čísel
if label in INT_FIELDS:
try:
fields[label] = int(value)
except ValueError:
fields[label] = value
else:
# Pokus o datum
parsed = parse_date(value)
fields[label] = parsed if parsed else value
n += 1
doc["fields"] = fields
doc["sourceFile"] = source_file
if meta:
doc["_meta"] = meta
return doc
def ensure_indexes(collection) -> None:
collection.create_index([("form.recordId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.id", ASCENDING)])
collection.create_index([("site.id", ASCENDING)])
collection.create_index([("study", ASCENDING)])
collection.create_index([("lastModified", ASCENDING)])
def import_file(
csv_path: str,
collection,
snapshot_col=None,
snapshot_date: str | None = None,
) -> tuple[int, int, int]:
"""
Importuje jeden CSV soubor. Vrátí (inserted, updated, errors).
snapshot_col: pokud je zadán, pro QueryDetails se zapíše i daily snapshot.
"""
inserted = updated = errors = 0
source_file = Path(csv_path).name
with open(csv_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f, delimiter=",", quotechar='"')
query_mode = is_query_details(reader.fieldnames or [])
for line_no, row in enumerate(reader, start=2):
try:
if query_mode:
doc = map_query_row(row, source_file)
upsert_key = {"queryId": doc["queryId"]}
# Snapshot — upsert na (queryId, snapshotDate)
if snapshot_col is not None and snapshot_date:
snap_doc = {**doc, "snapshotDate": snapshot_date}
snapshot_col.update_one(
{"queryId": doc["queryId"], "snapshotDate": snapshot_date},
{"$set": snap_doc},
upsert=True,
)
else:
doc = map_row(row, source_file)
record_id = doc.get("form", {}).get("recordId")
upsert_key = {"form.recordId": record_id} if record_id else None
if upsert_key:
result = collection.update_one(
upsert_key,
{"$set": doc},
upsert=True,
)
if result.upserted_id:
inserted += 1
else:
updated += 1
else:
collection.insert_one(doc)
inserted += 1
except PyMongoError as e:
errors += 1
log.error("Řádek %d v %s: MongoDB chyba: %s", line_no, csv_path, e)
except Exception as e:
errors += 1
log.error("Řádek %d v %s: %s", line_no, csv_path, e)
return inserted, updated, errors
def main() -> None:
parser = argparse.ArgumentParser(description="Import EDC CSV reportů do MongoDB")
parser.add_argument("files", nargs="+", help="CSV soubory nebo glob vzor")
parser.add_argument("--host", default="mongodb://192.168.1.76:27017", help="MongoDB URI")
parser.add_argument("--db", default="edc", help="Název databáze")
args = parser.parse_args()
# Rozbal glob vzory (důležité na Windows kde shell sám neglobuje)
paths: list[str] = []
for pattern in args.files:
expanded = glob.glob(pattern)
paths.extend(expanded if expanded else [pattern])
if not paths:
log.error("Žádné soubory nenalezeny.")
sys.exit(1)
client = MongoClient(args.host, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
except Exception as e:
log.error("Nelze se připojit k MongoDB (%s): %s", args.host, e)
sys.exit(1)
db = client[args.db]
total_inserted = total_updated = total_errors = 0
for csv_path in paths:
if not os.path.isfile(csv_path):
log.warning("Soubor neexistuje, přeskakuji: %s", csv_path)
continue
# Detekuj typ souboru a vyber kolekci + indexy
with open(csv_path, encoding="utf-8", newline="") as f:
fieldnames = csv.DictReader(f).fieldnames or []
if is_query_details(fieldnames):
col_name = "queries"
collection = db[col_name]
ensure_query_indexes(collection)
snapshot_col = db["queries_snapshots"]
ensure_snapshot_indexes(snapshot_col)
snapshot_date = extract_snapshot_date(csv_path)
log.info("Importuji: %s%s.%s + queries_snapshots [%s]",
csv_path, args.db, col_name, snapshot_date)
else:
col_name = collection_name_from_filename(csv_path)
collection = db[col_name]
ensure_indexes(collection)
snapshot_col = None
snapshot_date = None
log.info("Importuji: %s%s.%s", csv_path, args.db, col_name)
inserted, updated, errors = import_file(
csv_path, collection, snapshot_col, snapshot_date
)
total_inserted += inserted
total_updated += updated
total_errors += errors
log.info(" nové: %d aktualizované: %d chyby: %d", inserted, updated, errors)
log.info("=" * 60)
log.info("Celkem — nové: %d aktualizované: %d chyby: %d",
total_inserted, total_updated, total_errors)
client.close()
if __name__ == "__main__":
main()