452 lines
16 KiB
Python
452 lines
16 KiB
Python
"""
|
|
Import EDC CSV reportů do MongoDB.
|
|
|
|
Použití:
|
|
python edc_import.py report.csv
|
|
python edc_import.py reports/*.csv
|
|
python edc_import.py report.csv --host mongodb://192.168.1.100:27017 --db klinicka_studie
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import glob
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
from pymongo.errors import PyMongoError
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("edc_import.log", encoding="utf-8"),
|
|
logging.StreamHandler(open(sys.stdout.fileno(), mode="w", encoding="utf-8", closefd=False)),
|
|
],
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mapování pevných CSV sloupců na MongoDB cesty
|
|
# ---------------------------------------------------------------------------
|
|
FIXED_FIELDS = {
|
|
"StudyName": "study",
|
|
"SiteGroupName": "site.group",
|
|
"SiteID": "site.id",
|
|
"SiteNumber": "site.number",
|
|
"Site": "site.name",
|
|
"SubjectID": "subject.id",
|
|
"Subject": "subject.label",
|
|
"CRFVersionID": "form.crfVersionId",
|
|
"InstanceID": "form.instanceId",
|
|
"InstanceName": "form.instanceName",
|
|
"FolderSeq": "form.folderSeq",
|
|
"Page": "form.page",
|
|
"RecordID": "form.recordId",
|
|
"RecordPosition": "form.recordPosition",
|
|
"LastModifiedDate": "lastModified",
|
|
"PrintDateTime": "importedAt",
|
|
}
|
|
|
|
# Sloupce, které jdou do _meta (ostatní administrativní)
|
|
META_FIELDS = {"RunUser", "VersionNumber", "FilterField"}
|
|
|
|
# Pole, která se převedou na int
|
|
INT_FIELDS = {"Elapsed days"}
|
|
|
|
# Formáty datumů, které zkusíme parsovat
|
|
DATE_FORMATS = [
|
|
"%d %b %Y %H:%M:%S", # 20 MAY 2026 12:06:18
|
|
"%d %b %Y %H:%M:%S:%f", # 10 Aug 2025 18:13:22:080 (EDC query dates)
|
|
"%Y%m%d %H:%M:%S.%f", # 20250810 18:13:22.080 (sortable query dates)
|
|
"%Y-%m-%d %H:%M:%S", # 2026-05-20 12:06:28
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
"%d/%m/%Y %H:%M:%S",
|
|
"%m/%d/%Y %H:%M:%S",
|
|
"%m/%d/%Y %I:%M:%S %p", # 5/20/2026 1:23:27 PM
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# QueryDetails — detekce a mapování
|
|
# ---------------------------------------------------------------------------
|
|
QUERY_DETAIL_MARKER = "QueryID(ReQry)"
|
|
|
|
QUERY_META_FIELDS = {
|
|
"StudyParameter", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter",
|
|
"SubjectParameter", "SubjectStatusParameter", "FolderParameter", "FormParameter",
|
|
"FieldParameter", "MarkingGroupParameter", "QueryStatusParameter",
|
|
"IncludeInactivePagesParameter", "PageSDVParameter", "PageFrozenParameter",
|
|
"PageLockedParameter", "StartDateParameter", "EndDateParameter",
|
|
"MilestoneParameter", "ReportTypeParameter", "VersionNumber", "TimeZone",
|
|
"RunUser", "ErrorString",
|
|
# Sortable dates — redundantní, parsujeme z hlavních sloupců
|
|
"OpenedDateSrtble", "AnsweredDateSrtble", "ClosedDateSrtble",
|
|
# Agregátní počty — jdou do meta
|
|
"VisitSiteLevel", "VisitCountryLevel", "VisitStudyLevel",
|
|
"PageSubjectLevel", "PageSiteLevel", "PageCountryLevel", "PageStudyLevel",
|
|
"Queries (Op/Ans/SDV)",
|
|
}
|
|
|
|
|
|
def is_query_details(fieldnames: list[str]) -> bool:
|
|
return QUERY_DETAIL_MARKER in fieldnames
|
|
|
|
|
|
def map_query_row(row: dict, source_file: str) -> dict:
|
|
"""Přemapuje řádek QueryDetails reportu na MongoDB dokument."""
|
|
|
|
def val(col: str) -> str:
|
|
return (row.get(col) or "").strip()
|
|
|
|
def int_or_none(col: str):
|
|
v = val(col)
|
|
if v == "":
|
|
return None
|
|
try:
|
|
return int(v)
|
|
except ValueError:
|
|
return v
|
|
|
|
def date_or_str(col: str):
|
|
v = val(col)
|
|
if not v:
|
|
return None
|
|
parsed = parse_date(v)
|
|
return parsed if parsed else v
|
|
|
|
meta = {k: row[k].strip() for k in QUERY_META_FIELDS if row.get(k, "").strip()}
|
|
|
|
doc = {
|
|
"study": val("StudyParameter"),
|
|
"site": {
|
|
"group": val("Country/Region"),
|
|
"number": val("Site Number"),
|
|
"name": val("Sites"),
|
|
},
|
|
"subject": {
|
|
"label": val("Subjects"),
|
|
"status": val("Subject Status"),
|
|
},
|
|
"visit": val("Visits"),
|
|
"page": val("Pages"),
|
|
"recordPosition": int_or_none("RecordPosition"),
|
|
"field": val("Field"),
|
|
"queryGroup": val("Query Group"),
|
|
"queryId": val(QUERY_DETAIL_MARKER),
|
|
"queryStatus": val("QueryStatus"),
|
|
"openedBy": val("Opened By"),
|
|
"openedDate": date_or_str("Opened Date"),
|
|
"answeredBy": val("Answered By") or None,
|
|
"answeredDate": date_or_str("Answered Date"),
|
|
"closedBy": val("Closed By") or None,
|
|
"closedDate": date_or_str("Closed Date"),
|
|
"daysNotYetClosed": int_or_none("DaysNotYetClosed"),
|
|
"daysToAnswer": int_or_none("Days to Answer"),
|
|
"daysToClose": int_or_none("Days to Close"),
|
|
"queryText": val("QueryText"),
|
|
"answerText": val("Answer Text (if any)") or None,
|
|
"importedAt": date_or_str("PrintDateTime"),
|
|
"sourceFile": source_file,
|
|
"_meta": meta,
|
|
}
|
|
|
|
# Odstraň None hodnoty z top-level (ne z nested)
|
|
return {k: v for k, v in doc.items() if v is not None or k in ("queryId",)}
|
|
|
|
|
|
def ensure_query_indexes(collection) -> None:
|
|
collection.create_index([("queryId", ASCENDING)], unique=True, sparse=True)
|
|
collection.create_index([("subject.label", ASCENDING)])
|
|
collection.create_index([("site.number", ASCENDING)])
|
|
collection.create_index([("queryStatus", ASCENDING)])
|
|
collection.create_index([("openedDate", ASCENDING)])
|
|
|
|
|
|
def ensure_snapshot_indexes(collection) -> None:
|
|
"""Indexy pro queries_snapshots — unikátní kombinace queryId + snapshotDate."""
|
|
collection.create_index(
|
|
[("queryId", ASCENDING), ("snapshotDate", ASCENDING)],
|
|
unique=True,
|
|
)
|
|
collection.create_index([("snapshotDate", ASCENDING)])
|
|
collection.create_index([("queryStatus", ASCENDING)])
|
|
collection.create_index([("site.number", ASCENDING)])
|
|
collection.create_index([("subject.label", ASCENDING)])
|
|
|
|
|
|
def extract_snapshot_date(filename: str) -> str:
|
|
"""
|
|
Vytáhne datum ze jména souboru.
|
|
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv' → '2026-05-20'
|
|
Fallback: dnešní datum.
|
|
"""
|
|
stem = Path(filename).name
|
|
match = re.match(r"(\d{4}-\d{2}-\d{2})", stem)
|
|
if match:
|
|
return match.group(1)
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
|
|
def parse_date(value: str) -> str | None:
|
|
"""Pokusí se převést string na ISO 8601; jinak vrátí None."""
|
|
value = value.strip()
|
|
for fmt in DATE_FORMATS:
|
|
try:
|
|
dt = datetime.strptime(value, fmt)
|
|
return dt.replace(tzinfo=timezone.utc).isoformat()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def set_nested(doc: dict, path: str, value: str) -> None:
|
|
"""Nastaví hodnotu v nested dict podle tečkové cesty, např. 'site.id'."""
|
|
parts = path.split(".")
|
|
for part in parts[:-1]:
|
|
doc = doc.setdefault(part, {})
|
|
doc[parts[-1]] = value
|
|
|
|
|
|
def collection_name_from_filename(filename: str) -> str:
|
|
"""
|
|
Odvodí název kolekce z názvu souboru.
|
|
'2026-05-20_15-09_EDC_MDD3003_InterimInvestigatorSignature_DataListing.csv' → 'MDD3003_InterimInvestigatorSignature'
|
|
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv' → 'MDD3003_QueryDetails'
|
|
"""
|
|
stem = Path(filename).stem
|
|
# Se suffixem _DataListing
|
|
match = re.search(r"EDC_(.+?)_DataListing", stem, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1)
|
|
# Bez suffixu _DataListing (např. QueryDetails)
|
|
match = re.search(r"EDC_(.+)$", stem, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1)
|
|
return stem
|
|
|
|
|
|
def map_row(row: dict, source_file: str) -> dict:
|
|
"""Přemapuje jeden CSV řádek na MongoDB dokument."""
|
|
doc: dict = {}
|
|
meta: dict = {}
|
|
fields: dict = {}
|
|
|
|
# Zjisti všechny klíče pro FieldNValue/FieldNLabel
|
|
field_keys = set(row.keys())
|
|
|
|
for col, value in row.items():
|
|
value = value.strip() if value else ""
|
|
|
|
# Pevná pole
|
|
if col in FIXED_FIELDS:
|
|
path = FIXED_FIELDS[col]
|
|
|
|
if path == "form.folderSeq":
|
|
try:
|
|
value = int(value)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif path == "form.recordPosition":
|
|
try:
|
|
value = int(value)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif path in ("lastModified", "importedAt"):
|
|
parsed = parse_date(value)
|
|
value = parsed if parsed else value
|
|
|
|
set_nested(doc, path, value)
|
|
continue
|
|
|
|
# Meta pole
|
|
if col in META_FIELDS:
|
|
if value:
|
|
meta[col] = value
|
|
continue
|
|
|
|
# FieldNLabel / FieldNValue jsou zpracovány níže
|
|
if re.match(r"^Field\d+(Value|Label)$", col):
|
|
continue
|
|
|
|
# Zbývající neznámé pevné sloupce také do meta
|
|
if not re.match(r"^Field\d+", col):
|
|
if value:
|
|
meta[col] = value
|
|
|
|
# Zpracuj páry Field1Value/Field1Label ... Field300Value/Field300Label
|
|
n = 1
|
|
while True:
|
|
val_key = f"Field{n}Value"
|
|
lbl_key = f"Field{n}Label"
|
|
if val_key not in field_keys and lbl_key not in field_keys:
|
|
break
|
|
label = (row.get(lbl_key) or "").strip()
|
|
value = (row.get(val_key) or "").strip()
|
|
if label and value:
|
|
# Pokus o převod čísel
|
|
if label in INT_FIELDS:
|
|
try:
|
|
fields[label] = int(value)
|
|
except ValueError:
|
|
fields[label] = value
|
|
else:
|
|
# Pokus o datum
|
|
parsed = parse_date(value)
|
|
fields[label] = parsed if parsed else value
|
|
n += 1
|
|
|
|
doc["fields"] = fields
|
|
doc["sourceFile"] = source_file
|
|
if meta:
|
|
doc["_meta"] = meta
|
|
|
|
return doc
|
|
|
|
|
|
def ensure_indexes(collection) -> None:
|
|
collection.create_index([("form.recordId", ASCENDING)], unique=True, sparse=True)
|
|
collection.create_index([("subject.id", ASCENDING)])
|
|
collection.create_index([("site.id", ASCENDING)])
|
|
collection.create_index([("study", ASCENDING)])
|
|
collection.create_index([("lastModified", ASCENDING)])
|
|
|
|
|
|
def import_file(
|
|
csv_path: str,
|
|
collection,
|
|
snapshot_col=None,
|
|
snapshot_date: str | None = None,
|
|
) -> tuple[int, int, int]:
|
|
"""
|
|
Importuje jeden CSV soubor. Vrátí (inserted, updated, errors).
|
|
snapshot_col: pokud je zadán, pro QueryDetails se zapíše i daily snapshot.
|
|
"""
|
|
inserted = updated = errors = 0
|
|
source_file = Path(csv_path).name
|
|
|
|
with open(csv_path, encoding="utf-8", newline="") as f:
|
|
reader = csv.DictReader(f, delimiter=",", quotechar='"')
|
|
query_mode = is_query_details(reader.fieldnames or [])
|
|
|
|
for line_no, row in enumerate(reader, start=2):
|
|
try:
|
|
if query_mode:
|
|
doc = map_query_row(row, source_file)
|
|
upsert_key = {"queryId": doc["queryId"]}
|
|
|
|
# Snapshot — upsert na (queryId, snapshotDate)
|
|
if snapshot_col is not None and snapshot_date:
|
|
snap_doc = {**doc, "snapshotDate": snapshot_date}
|
|
snapshot_col.update_one(
|
|
{"queryId": doc["queryId"], "snapshotDate": snapshot_date},
|
|
{"$set": snap_doc},
|
|
upsert=True,
|
|
)
|
|
else:
|
|
doc = map_row(row, source_file)
|
|
record_id = doc.get("form", {}).get("recordId")
|
|
upsert_key = {"form.recordId": record_id} if record_id else None
|
|
|
|
if upsert_key:
|
|
result = collection.update_one(
|
|
upsert_key,
|
|
{"$set": doc},
|
|
upsert=True,
|
|
)
|
|
if result.upserted_id:
|
|
inserted += 1
|
|
else:
|
|
updated += 1
|
|
else:
|
|
collection.insert_one(doc)
|
|
inserted += 1
|
|
|
|
except PyMongoError as e:
|
|
errors += 1
|
|
log.error("Řádek %d v %s: MongoDB chyba: %s", line_no, csv_path, e)
|
|
except Exception as e:
|
|
errors += 1
|
|
log.error("Řádek %d v %s: %s", line_no, csv_path, e)
|
|
|
|
return inserted, updated, errors
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Import EDC CSV reportů do MongoDB")
|
|
parser.add_argument("files", nargs="+", help="CSV soubory nebo glob vzor")
|
|
parser.add_argument("--host", default="mongodb://192.168.1.76:27017", help="MongoDB URI")
|
|
parser.add_argument("--db", default="edc", help="Název databáze")
|
|
args = parser.parse_args()
|
|
|
|
# Rozbal glob vzory (důležité na Windows kde shell sám neglobuje)
|
|
paths: list[str] = []
|
|
for pattern in args.files:
|
|
expanded = glob.glob(pattern)
|
|
paths.extend(expanded if expanded else [pattern])
|
|
|
|
if not paths:
|
|
log.error("Žádné soubory nenalezeny.")
|
|
sys.exit(1)
|
|
|
|
client = MongoClient(args.host, serverSelectionTimeoutMS=5000)
|
|
try:
|
|
client.admin.command("ping")
|
|
except Exception as e:
|
|
log.error("Nelze se připojit k MongoDB (%s): %s", args.host, e)
|
|
sys.exit(1)
|
|
|
|
db = client[args.db]
|
|
|
|
total_inserted = total_updated = total_errors = 0
|
|
|
|
for csv_path in paths:
|
|
if not os.path.isfile(csv_path):
|
|
log.warning("Soubor neexistuje, přeskakuji: %s", csv_path)
|
|
continue
|
|
|
|
# Detekuj typ souboru a vyber kolekci + indexy
|
|
with open(csv_path, encoding="utf-8", newline="") as f:
|
|
fieldnames = csv.DictReader(f).fieldnames or []
|
|
if is_query_details(fieldnames):
|
|
col_name = "queries"
|
|
collection = db[col_name]
|
|
ensure_query_indexes(collection)
|
|
snapshot_col = db["queries_snapshots"]
|
|
ensure_snapshot_indexes(snapshot_col)
|
|
snapshot_date = extract_snapshot_date(csv_path)
|
|
log.info("Importuji: %s → %s.%s + queries_snapshots [%s]",
|
|
csv_path, args.db, col_name, snapshot_date)
|
|
else:
|
|
col_name = collection_name_from_filename(csv_path)
|
|
collection = db[col_name]
|
|
ensure_indexes(collection)
|
|
snapshot_col = None
|
|
snapshot_date = None
|
|
log.info("Importuji: %s → %s.%s", csv_path, args.db, col_name)
|
|
|
|
inserted, updated, errors = import_file(
|
|
csv_path, collection, snapshot_col, snapshot_date
|
|
)
|
|
total_inserted += inserted
|
|
total_updated += updated
|
|
total_errors += errors
|
|
log.info(" nové: %d aktualizované: %d chyby: %d", inserted, updated, errors)
|
|
|
|
log.info("=" * 60)
|
|
log.info("Celkem — nové: %d aktualizované: %d chyby: %d",
|
|
total_inserted, total_updated, total_errors)
|
|
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|