Add EDC MongoDB import with change history tracking
New import_to_mongo.py: imports Medidata EDC Data Listing CSVs into MongoDB (db: edc), filters CZE-only rows, uses dot-notation collections (e.g. UCO3001.ConcomitantTherapy), and tracks field changes over time in a history[] array per document. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+5082
File diff suppressed because one or more lines are too long
+85
File diff suppressed because one or more lines are too long
@@ -0,0 +1,284 @@
|
||||
"""
|
||||
Import EDC Data Listing CSV do MongoDB (databáze: edc).
|
||||
|
||||
Kolekce: {STUDY}.{FormName} (např. UCO3001.ConcomitantTherapy)
|
||||
Filtr: pouze řádky s SiteGroupName == "CZE"
|
||||
Historie: při změně fields se stará verze uloží do pole history[]
|
||||
|
||||
Použití:
|
||||
python import_to_mongo.py # importuje všechny CSV z downloads/
|
||||
python import_to_mongo.py downloads/konkretni.csv # jeden soubor
|
||||
"""
|
||||
|
||||
import csv
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from pymongo import MongoClient, ASCENDING
|
||||
|
||||
MONGO_URI = "mongodb://192.168.1.76:27017"
|
||||
DB_NAME = "edc"
|
||||
DOWNLOADS_DIR = Path(__file__).parent / "downloads"
|
||||
|
||||
COUNTRY_FILTER = "CZE"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mapování pevných CSV sloupců
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
FIXED_FIELDS = {
|
||||
"SiteGroupName": "site.group",
|
||||
"SiteID": "site.id",
|
||||
"SiteNumber": "site.number",
|
||||
"Site": "site.name",
|
||||
"SubjectID": "subject.id",
|
||||
"Subject": "subject.label",
|
||||
"CRFVersionID": "form.crfVersionId",
|
||||
"InstanceID": "form.instanceId",
|
||||
"InstanceName": "form.instanceName",
|
||||
"FolderSeq": "form.folderSeq",
|
||||
"Page": "form.page",
|
||||
"RecordID": "form.recordId",
|
||||
"RecordPosition": "form.recordPosition",
|
||||
"LastModifiedDate": "lastModified",
|
||||
}
|
||||
|
||||
INT_CAST = {"form.folderSeq", "form.recordPosition"}
|
||||
|
||||
META_FIELDS = {
|
||||
"StudyName", "SiteGroupParameter", "SiteNumberParameter", "SiteParameter",
|
||||
"SubjectParameter", "FormParameter", "FieldParameter", "FilterField",
|
||||
"FilterValue", "StartDateParameter", "EndDateParameter", "RunUser",
|
||||
"VersionNumber", "PrintDateTime", "TimeZone", "LastModifiedDateSortable",
|
||||
"StartDateSortable", "EndDateSortable", "ErrorMsg",
|
||||
}
|
||||
|
||||
DATE_FORMATS = [
|
||||
"%d %b %Y %H:%M:%S",
|
||||
"%d %b %Y %H:%M:%S:%f",
|
||||
"%d %b %Y",
|
||||
"%d %B %Y",
|
||||
"%Y%m%d %H:%M:%S.%f",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
"%m/%d/%Y %I:%M:%S %p",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_date(value: str) -> str | None:
|
||||
value = value.strip()
|
||||
for fmt in DATE_FORMATS:
|
||||
try:
|
||||
dt = datetime.strptime(value, fmt)
|
||||
return dt.replace(tzinfo=timezone.utc).isoformat()
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def set_nested(doc: dict, path: str, value) -> None:
|
||||
parts = path.split(".")
|
||||
for part in parts[:-1]:
|
||||
doc = doc.setdefault(part, {})
|
||||
doc[parts[-1]] = value
|
||||
|
||||
|
||||
def extract_snapshot_date(filename: str) -> str:
|
||||
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
|
||||
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def parse_collection_name(filename: str) -> str | None:
|
||||
"""
|
||||
Z názvu souboru odvodí kolekci ve formátu STUDY.FormName.
|
||||
Vrátí None pro QueryDetails (mají vlastní flow).
|
||||
"""
|
||||
stem = Path(filename).stem
|
||||
if "QueryDetails" in stem:
|
||||
return None
|
||||
match = re.search(
|
||||
r"EDC_(\w+?)_(?:ALL_|CZE_|[A-Z]{2,3}_)?(.+?)_DataListing",
|
||||
stem, re.IGNORECASE,
|
||||
)
|
||||
if match:
|
||||
study, form = match.group(1), match.group(2)
|
||||
return f"{study}.{form}"
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CSV → dokument
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def map_row(row: dict) -> dict:
|
||||
doc: dict = {}
|
||||
fields: dict = {}
|
||||
field_keys = set(row.keys())
|
||||
|
||||
for col, value in row.items():
|
||||
value = value.strip() if value else ""
|
||||
|
||||
if col in FIXED_FIELDS:
|
||||
path = FIXED_FIELDS[col]
|
||||
if path in INT_CAST:
|
||||
try:
|
||||
value = int(value)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
elif path == "lastModified":
|
||||
parsed = parse_date(value)
|
||||
value = parsed if parsed else value
|
||||
set_nested(doc, path, value)
|
||||
continue
|
||||
|
||||
if col in META_FIELDS:
|
||||
continue
|
||||
|
||||
if re.match(r"^Field\d+(Value|Label)$", col):
|
||||
continue
|
||||
|
||||
n = 1
|
||||
while True:
|
||||
val_key = f"Field{n}Value"
|
||||
lbl_key = f"Field{n}Label"
|
||||
if val_key not in field_keys and lbl_key not in field_keys:
|
||||
break
|
||||
label = (row.get(lbl_key) or "").strip()
|
||||
value = (row.get(val_key) or "").strip()
|
||||
if label and value:
|
||||
parsed = parse_date(value)
|
||||
fields[label] = parsed if parsed else value
|
||||
n += 1
|
||||
|
||||
doc["fields"] = fields
|
||||
return doc
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Import jednoho souboru
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def import_file(csv_path: str, db) -> dict:
|
||||
filename = Path(csv_path).name
|
||||
col_name = parse_collection_name(filename)
|
||||
if col_name is None:
|
||||
print(f" Preskakuji (QueryDetails): {filename}")
|
||||
return {"skipped": True}
|
||||
|
||||
snapshot_date = extract_snapshot_date(filename)
|
||||
collection = db[col_name]
|
||||
|
||||
inserted = changed = unchanged = filtered_out = 0
|
||||
|
||||
with open(csv_path, encoding="utf-8", newline="") as f:
|
||||
reader = csv.DictReader(f, delimiter=",", quotechar='"')
|
||||
|
||||
for row in reader:
|
||||
site_group = (row.get("SiteGroupName") or "").strip()
|
||||
if site_group != COUNTRY_FILTER:
|
||||
filtered_out += 1
|
||||
continue
|
||||
|
||||
doc = map_row(row)
|
||||
record_id = doc.get("form", {}).get("recordId")
|
||||
if not record_id:
|
||||
continue
|
||||
|
||||
doc["sourceFile"] = filename
|
||||
|
||||
existing = collection.find_one({"form.recordId": record_id})
|
||||
|
||||
if existing is None:
|
||||
doc["firstSeen"] = snapshot_date
|
||||
doc["lastSeen"] = snapshot_date
|
||||
doc["history"] = []
|
||||
collection.insert_one(doc)
|
||||
inserted += 1
|
||||
|
||||
elif existing.get("fields") != doc["fields"]:
|
||||
old_entry = {
|
||||
"date": existing.get("lastSeen", snapshot_date),
|
||||
"fields": existing["fields"],
|
||||
}
|
||||
update_doc = {k: v for k, v in doc.items()}
|
||||
update_doc["lastSeen"] = snapshot_date
|
||||
collection.update_one(
|
||||
{"_id": existing["_id"]},
|
||||
{
|
||||
"$push": {"history": old_entry},
|
||||
"$set": update_doc,
|
||||
},
|
||||
)
|
||||
changed += 1
|
||||
|
||||
else:
|
||||
collection.update_one(
|
||||
{"_id": existing["_id"]},
|
||||
{"$set": {"lastSeen": snapshot_date, "sourceFile": filename}},
|
||||
)
|
||||
unchanged += 1
|
||||
|
||||
collection.create_index([("form.recordId", ASCENDING)], unique=True)
|
||||
collection.create_index([("subject.label", ASCENDING)])
|
||||
collection.create_index([("site.number", ASCENDING)])
|
||||
|
||||
stats = {
|
||||
"collection": col_name,
|
||||
"snapshot": snapshot_date,
|
||||
"inserted": inserted,
|
||||
"changed": changed,
|
||||
"unchanged": unchanged,
|
||||
"filtered_out": filtered_out,
|
||||
}
|
||||
print(f" {col_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZE")
|
||||
return stats
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
paths: list[Path] = []
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
for arg in sys.argv[1:]:
|
||||
p = Path(arg)
|
||||
if p.is_file():
|
||||
paths.append(p)
|
||||
else:
|
||||
print(f"Soubor nenalezen: {arg}")
|
||||
else:
|
||||
paths = sorted(DOWNLOADS_DIR.glob("*_DataListing.csv"))
|
||||
|
||||
if not paths:
|
||||
print("Zadne CSV soubory k importu.")
|
||||
return
|
||||
|
||||
print(f"Nalezeno {len(paths)} souboru.\n")
|
||||
|
||||
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
||||
client.admin.command("ping")
|
||||
db = client[DB_NAME]
|
||||
|
||||
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
||||
|
||||
for csv_path in paths:
|
||||
print(f"Import: {csv_path.name}")
|
||||
stats = import_file(str(csv_path), db)
|
||||
if not stats.get("skipped"):
|
||||
for k in total:
|
||||
total[k] += stats.get(k, 0)
|
||||
|
||||
client.close()
|
||||
|
||||
print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -3,8 +3,9 @@ from download_uco3001 import download_datalisting_reports_3001
|
||||
# === Vyber jeden řádek, odkomentuj ho a spusť ===
|
||||
|
||||
# --- Trial Disposition ---
|
||||
download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)")
|
||||
download_datalisting_reports_3001("Date of Visit")
|
||||
# download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)")
|
||||
# download_datalisting_reports_3001("Date of Visit")
|
||||
download_datalisting_reports_3001("Concomitant Therapy", country="CZE")
|
||||
# download_datalisting_reports_3001("Trial Disposition (Completion / Discontinuation)", country="CZE")
|
||||
|
||||
# --- Date of Visit ---
|
||||
|
||||
Reference in New Issue
Block a user