This commit is contained in:
2026-05-20 18:15:56 +02:00
parent b8543d1cab
commit b7d85f8d9b
10 changed files with 423 additions and 2876 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+23
View File
@@ -0,0 +1,23 @@
2026-05-20 17:56:21,647 ERROR Nelze se připojit k MongoDB (mongodb://localhost:27017): localhost:27017: [WinError 10061] No connection could be made because the target machine actively refused it (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 5.0s, Topology Description: <TopologyDescription id: 6a0dd9a0ce7e4c93f3399a61, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [WinError 10061] No connection could be made because the target machine actively refused it (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>
2026-05-20 17:56:45,268 INFO Importuji: downloads/2026-05-20_15-09_EDC_MDD3003_DateofVisit_DataListing.csv → edc.MDD3003_DateofVisit
2026-05-20 17:56:48,052 INFO nové: 381 aktualizované: 0 chyby: 0
2026-05-20 17:56:48,052 INFO ============================================================
2026-05-20 17:56:48,052 INFO Celkem — nové: 381 aktualizované: 0 chyby: 0
2026-05-20 18:12:48,691 INFO Importuji: downloads/2026-05-20_15-21_EDC_MDD3003_QueryDetails.csv → edc.2026-05-20_15-21_EDC_MDD3003_QueryDetails
2026-05-20 18:12:48,739 INFO nové: 4 aktualizované: 0 chyby: 0
2026-05-20 18:12:48,801 INFO Importuji: downloads/2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv → edc.2026-05-20_15-23_EDC_MDD3003_QueryDetails
2026-05-20 18:13:03,331 INFO nové: 2091 aktualizované: 0 chyby: 0
2026-05-20 18:13:03,332 INFO ============================================================
2026-05-20 18:13:03,332 INFO Celkem — nové: 2095 aktualizované: 0 chyby: 0
2026-05-20 18:13:31,267 INFO Importuji: downloads/2026-05-20_15-21_EDC_MDD3003_QueryDetails.csv → edc.MDD3003_QueryDetails
2026-05-20 18:13:31,306 INFO nové: 4 aktualizované: 0 chyby: 0
2026-05-20 18:13:31,354 INFO Importuji: downloads/2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv → edc.MDD3003_QueryDetails
2026-05-20 18:13:45,497 INFO nové: 2087 aktualizované: 4 chyby: 0
2026-05-20 18:13:45,497 INFO ============================================================
2026-05-20 18:13:45,497 INFO Celkem — nové: 2091 aktualizované: 4 chyby: 0
2026-05-20 18:14:06,652 INFO Importuji: downloads/2026-05-20_15-21_EDC_MDD3003_QueryDetails.csv → edc.queries
2026-05-20 18:14:06,683 INFO nové: 4 aktualizované: 0 chyby: 0
2026-05-20 18:14:06,727 INFO Importuji: downloads/2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv → edc.queries
2026-05-20 18:14:22,340 INFO nové: 2087 aktualizované: 4 chyby: 0
2026-05-20 18:14:22,340 INFO ============================================================
2026-05-20 18:14:22,340 INFO Celkem — nové: 2091 aktualizované: 4 chyby: 0
+400
View File
@@ -0,0 +1,400 @@
"""
Import EDC CSV reportů do MongoDB.
Použití:
python edc_import.py report.csv
python edc_import.py reports/*.csv
python edc_import.py report.csv --host mongodb://192.168.1.100:27017 --db klinicka_studie
"""
import argparse
import csv
import glob
import logging
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from pymongo import MongoClient, ASCENDING
from pymongo.errors import PyMongoError
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler("edc_import.log", encoding="utf-8"),
logging.StreamHandler(open(sys.stdout.fileno(), mode="w", encoding="utf-8", closefd=False)),
],
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mapování pevných CSV sloupců na MongoDB cesty
# ---------------------------------------------------------------------------
FIXED_FIELDS = {
"StudyName": "study",
"SiteGroupName": "site.group",
"SiteID": "site.id",
"SiteNumber": "site.number",
"Site": "site.name",
"SubjectID": "subject.id",
"Subject": "subject.label",
"CRFVersionID": "form.crfVersionId",
"InstanceID": "form.instanceId",
"InstanceName": "form.instanceName",
"FolderSeq": "form.folderSeq",
"Page": "form.page",
"RecordID": "form.recordId",
"RecordPosition": "form.recordPosition",
"LastModifiedDate": "lastModified",
"PrintDateTime": "importedAt",
}
# Sloupce, které jdou do _meta (ostatní administrativní)
META_FIELDS = {"RunUser", "VersionNumber", "FilterField"}
# Pole, která se převedou na int
INT_FIELDS = {"Elapsed days"}
# Formáty datumů, které zkusíme parsovat
DATE_FORMATS = [
"%d %b %Y %H:%M:%S", # 20 MAY 2026 12:06:18
"%d %b %Y %H:%M:%S:%f", # 10 Aug 2025 18:13:22:080 (EDC query dates)
"%Y%m%d %H:%M:%S.%f", # 20250810 18:13:22.080 (sortable query dates)
"%Y-%m-%d %H:%M:%S", # 2026-05-20 12:06:28
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%d/%m/%Y %H:%M:%S",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S %p", # 5/20/2026 1:23:27 PM
]
# ---------------------------------------------------------------------------
# QueryDetails — detekce a mapování
# ---------------------------------------------------------------------------
QUERY_DETAIL_MARKER = "QueryID(ReQry)"
QUERY_META_FIELDS = {
"StudyParameter", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter",
"SubjectParameter", "SubjectStatusParameter", "FolderParameter", "FormParameter",
"FieldParameter", "MarkingGroupParameter", "QueryStatusParameter",
"IncludeInactivePagesParameter", "PageSDVParameter", "PageFrozenParameter",
"PageLockedParameter", "StartDateParameter", "EndDateParameter",
"MilestoneParameter", "ReportTypeParameter", "VersionNumber", "TimeZone",
"RunUser", "ErrorString",
# Sortable dates — redundantní, parsujeme z hlavních sloupců
"OpenedDateSrtble", "AnsweredDateSrtble", "ClosedDateSrtble",
# Agregátní počty — jdou do meta
"VisitSiteLevel", "VisitCountryLevel", "VisitStudyLevel",
"PageSubjectLevel", "PageSiteLevel", "PageCountryLevel", "PageStudyLevel",
"Queries (Op/Ans/SDV)",
}
def is_query_details(fieldnames: list[str]) -> bool:
return QUERY_DETAIL_MARKER in fieldnames
def map_query_row(row: dict, source_file: str) -> dict:
"""Přemapuje řádek QueryDetails reportu na MongoDB dokument."""
def val(col: str) -> str:
return (row.get(col) or "").strip()
def int_or_none(col: str):
v = val(col)
if v == "":
return None
try:
return int(v)
except ValueError:
return v
def date_or_str(col: str):
v = val(col)
if not v:
return None
parsed = parse_date(v)
return parsed if parsed else v
meta = {k: row[k].strip() for k in QUERY_META_FIELDS if row.get(k, "").strip()}
doc = {
"study": val("StudyParameter"),
"site": {
"group": val("Country/Region"),
"number": val("Site Number"),
"name": val("Sites"),
},
"subject": {
"label": val("Subjects"),
"status": val("Subject Status"),
},
"visit": val("Visits"),
"page": val("Pages"),
"recordPosition": int_or_none("RecordPosition"),
"field": val("Field"),
"queryGroup": val("Query Group"),
"queryId": val(QUERY_DETAIL_MARKER),
"queryStatus": val("QueryStatus"),
"openedBy": val("Opened By"),
"openedDate": date_or_str("Opened Date"),
"answeredBy": val("Answered By") or None,
"answeredDate": date_or_str("Answered Date"),
"closedBy": val("Closed By") or None,
"closedDate": date_or_str("Closed Date"),
"daysNotYetClosed": int_or_none("DaysNotYetClosed"),
"daysToAnswer": int_or_none("Days to Answer"),
"daysToClose": int_or_none("Days to Close"),
"queryText": val("QueryText"),
"answerText": val("Answer Text (if any)") or None,
"importedAt": date_or_str("PrintDateTime"),
"sourceFile": source_file,
"_meta": meta,
}
# Odstraň None hodnoty z top-level (ne z nested)
return {k: v for k, v in doc.items() if v is not None or k in ("queryId",)}
def ensure_query_indexes(collection) -> None:
collection.create_index([("queryId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.label", ASCENDING)])
collection.create_index([("site.number", ASCENDING)])
collection.create_index([("queryStatus", ASCENDING)])
collection.create_index([("openedDate", ASCENDING)])
def parse_date(value: str) -> str | None:
"""Pokusí se převést string na ISO 8601; jinak vrátí None."""
value = value.strip()
for fmt in DATE_FORMATS:
try:
dt = datetime.strptime(value, fmt)
return dt.replace(tzinfo=timezone.utc).isoformat()
except ValueError:
continue
return None
def set_nested(doc: dict, path: str, value: str) -> None:
"""Nastaví hodnotu v nested dict podle tečkové cesty, např. 'site.id'."""
parts = path.split(".")
for part in parts[:-1]:
doc = doc.setdefault(part, {})
doc[parts[-1]] = value
def collection_name_from_filename(filename: str) -> str:
"""
Odvodí název kolekce z názvu souboru.
'2026-05-20_15-09_EDC_MDD3003_InterimInvestigatorSignature_DataListing.csv''MDD3003_InterimInvestigatorSignature'
'2026-05-20_15-23_EDC_MDD3003_QueryDetails.csv''MDD3003_QueryDetails'
"""
stem = Path(filename).stem
# Se suffixem _DataListing
match = re.search(r"EDC_(.+?)_DataListing", stem, re.IGNORECASE)
if match:
return match.group(1)
# Bez suffixu _DataListing (např. QueryDetails)
match = re.search(r"EDC_(.+)$", stem, re.IGNORECASE)
if match:
return match.group(1)
return stem
def map_row(row: dict, source_file: str) -> dict:
"""Přemapuje jeden CSV řádek na MongoDB dokument."""
doc: dict = {}
meta: dict = {}
fields: dict = {}
# Zjisti všechny klíče pro FieldNValue/FieldNLabel
field_keys = set(row.keys())
for col, value in row.items():
value = value.strip() if value else ""
# Pevná pole
if col in FIXED_FIELDS:
path = FIXED_FIELDS[col]
if path == "form.folderSeq":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path == "form.recordPosition":
try:
value = int(value)
except (ValueError, TypeError):
pass
elif path in ("lastModified", "importedAt"):
parsed = parse_date(value)
value = parsed if parsed else value
set_nested(doc, path, value)
continue
# Meta pole
if col in META_FIELDS:
if value:
meta[col] = value
continue
# FieldNLabel / FieldNValue jsou zpracovány níže
if re.match(r"^Field\d+(Value|Label)$", col):
continue
# Zbývající neznámé pevné sloupce také do meta
if not re.match(r"^Field\d+", col):
if value:
meta[col] = value
# Zpracuj páry Field1Value/Field1Label ... Field300Value/Field300Label
n = 1
while True:
val_key = f"Field{n}Value"
lbl_key = f"Field{n}Label"
if val_key not in field_keys and lbl_key not in field_keys:
break
label = (row.get(lbl_key) or "").strip()
value = (row.get(val_key) or "").strip()
if label and value:
# Pokus o převod čísel
if label in INT_FIELDS:
try:
fields[label] = int(value)
except ValueError:
fields[label] = value
else:
# Pokus o datum
parsed = parse_date(value)
fields[label] = parsed if parsed else value
n += 1
doc["fields"] = fields
doc["sourceFile"] = source_file
if meta:
doc["_meta"] = meta
return doc
def ensure_indexes(collection) -> None:
collection.create_index([("form.recordId", ASCENDING)], unique=True, sparse=True)
collection.create_index([("subject.id", ASCENDING)])
collection.create_index([("site.id", ASCENDING)])
collection.create_index([("study", ASCENDING)])
collection.create_index([("lastModified", ASCENDING)])
def import_file(csv_path: str, collection) -> tuple[int, int, int]:
"""Importuje jeden CSV soubor. Vrátí (inserted, updated, errors)."""
inserted = updated = errors = 0
source_file = Path(csv_path).name
with open(csv_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f, delimiter=",", quotechar='"')
query_mode = is_query_details(reader.fieldnames or [])
for line_no, row in enumerate(reader, start=2):
try:
if query_mode:
doc = map_query_row(row, source_file)
upsert_key = {"queryId": doc["queryId"]}
else:
doc = map_row(row, source_file)
record_id = doc.get("form", {}).get("recordId")
upsert_key = {"form.recordId": record_id} if record_id else None
if upsert_key:
result = collection.update_one(
upsert_key,
{"$set": doc},
upsert=True,
)
if result.upserted_id:
inserted += 1
else:
updated += 1
else:
collection.insert_one(doc)
inserted += 1
except PyMongoError as e:
errors += 1
log.error("Řádek %d v %s: MongoDB chyba: %s", line_no, csv_path, e)
except Exception as e:
errors += 1
log.error("Řádek %d v %s: %s", line_no, csv_path, e)
return inserted, updated, errors
def main() -> None:
parser = argparse.ArgumentParser(description="Import EDC CSV reportů do MongoDB")
parser.add_argument("files", nargs="+", help="CSV soubory nebo glob vzor")
parser.add_argument("--host", default="mongodb://192.168.1.76:27017", help="MongoDB URI")
parser.add_argument("--db", default="edc", help="Název databáze")
args = parser.parse_args()
# Rozbal glob vzory (důležité na Windows kde shell sám neglobuje)
paths: list[str] = []
for pattern in args.files:
expanded = glob.glob(pattern)
paths.extend(expanded if expanded else [pattern])
if not paths:
log.error("Žádné soubory nenalezeny.")
sys.exit(1)
client = MongoClient(args.host, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
except Exception as e:
log.error("Nelze se připojit k MongoDB (%s): %s", args.host, e)
sys.exit(1)
db = client[args.db]
total_inserted = total_updated = total_errors = 0
for csv_path in paths:
if not os.path.isfile(csv_path):
log.warning("Soubor neexistuje, přeskakuji: %s", csv_path)
continue
# Detekuj typ souboru a vyber kolekci + indexy
with open(csv_path, encoding="utf-8", newline="") as f:
fieldnames = csv.DictReader(f).fieldnames or []
if is_query_details(fieldnames):
col_name = "queries"
collection = db[col_name]
ensure_query_indexes(collection)
else:
col_name = collection_name_from_filename(csv_path)
collection = db[col_name]
ensure_indexes(collection)
log.info("Importuji: %s%s.%s", csv_path, args.db, col_name)
inserted, updated, errors = import_file(csv_path, collection)
total_inserted += inserted
total_updated += updated
total_errors += errors
log.info(" nové: %d aktualizované: %d chyby: %d", inserted, updated, errors)
log.info("=" * 60)
log.info("Celkem — nové: %d aktualizované: %d chyby: %d",
total_inserted, total_updated, total_errors)
client.close()
if __name__ == "__main__":
main()