z230
This commit is contained in:
@@ -0,0 +1,361 @@
|
||||
# =============================================================================
|
||||
# Název: import_to_mongo_v1.5.py
|
||||
# Verze: 1.5
|
||||
# Datum: 2026-06-16
|
||||
# Popis: Import CSV reportů do MongoDB (db: covance).
|
||||
# Pipeline 1 — allSamples: kolekce allsamples, klíč Container Barcode No.
|
||||
# Zdroj: Source (study 36940 + 35472)
|
||||
# (study NEtagováno — jeden soubor obsahuje obě studie,
|
||||
# rozlišují se polem fields."Protocol Code")
|
||||
# Pipeline 2 — kits: kolekce kits, klíč Accession
|
||||
# Zdroj: Source (study 36940 + 35472)
|
||||
# v1.5: tagováno pole study z názvu souboru (study-XXXXX),
|
||||
# takže kity lze filtrovat per studie bez regexu na sourceFile.
|
||||
# Pipeline 3 — results: kolekce results, laboratorní výsledky per centrum.
|
||||
# Zdroj: Source, soubory test-results-{SITE}-{typ}.csv
|
||||
# (1. řádek = disclaimer, hlavička je 2. řádek!)
|
||||
# Dva typy (standard / microbiology) v jedné kolekci,
|
||||
# rozlišené polem resultType. record_id:
|
||||
# standard: STD|{Accession}|{Test Group}|{Test}|{occ}
|
||||
# microbiology: MIC|{Accession}|{Test Group}|{Specimen}|
|
||||
# {Test Description}|{Drug Name/Agent}|{occ}
|
||||
# Pipeline 4 — equeries: kolekce equeries, eQuery report (study 36940 + 35472).
|
||||
# Zdroj: Source, soubory ...-equery.csv (FULL).
|
||||
# Klíč eQueryId (stabilní systémové ID, unikátní per řádek);
|
||||
# řádky footeru s parametry filtru (nečíselný eQueryId) se
|
||||
# přeskakují. History sleduje životní cyklus dotazu
|
||||
# (Open -> Response Received -> Closed).
|
||||
# Varianta ...-equery_unresponded_only.csv je jen podmnožina
|
||||
# (Status=Open) téhož reportu + footer => NEIMPORTUJE se,
|
||||
# pouze se přesune do Zpracovano/ (move-only pipeline).
|
||||
# Upsert s historií změn, zpracovaný soubor přesunut do Zpracovano/.
|
||||
# Přepínač --dry-run: nic nezapisuje do DB ani nepřesouvá soubory.
|
||||
# =============================================================================
|
||||
|
||||
import csv
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from pymongo import MongoClient, ASCENDING
|
||||
|
||||
MONGO_URI = "mongodb://192.168.1.76:27017"
|
||||
DB_NAME = "covance"
|
||||
|
||||
SOURCE = Path(__file__).parent / "Source"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Builders record_id + metadata pro jednotlivé pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
def make_keyed_record(upsert_key: str):
|
||||
"""Jednoduchý klíč = hodnota jednoho sloupce (allsamples, kits).
|
||||
Pokud má pipeline file_meta, jeho hodnoty (např. study) se přidají do extra."""
|
||||
def builder(fields: dict, fmeta: dict | None, occ: dict):
|
||||
key_val = fields.get(upsert_key)
|
||||
if not key_val:
|
||||
return None, {}
|
||||
extra = dict(fmeta) if fmeta else {}
|
||||
return key_val, extra
|
||||
return builder
|
||||
|
||||
|
||||
def _norm_subject(raw: str | None) -> str:
|
||||
"""'CZ100062001 - null' -> 'CZ100062001'."""
|
||||
s = (raw or "").strip()
|
||||
return s.split(" - null")[0].strip()
|
||||
|
||||
|
||||
def make_results_record(fields: dict, fmeta: dict, occ: dict):
|
||||
rtype = fmeta["resultType"]
|
||||
accession = fields.get("Accession")
|
||||
if not accession:
|
||||
return None, {}
|
||||
|
||||
if rtype == "standard":
|
||||
parts = (accession, fields.get("Test Group", ""), fields.get("Test", ""))
|
||||
prefix = "STD"
|
||||
else: # microbiology
|
||||
parts = (
|
||||
accession,
|
||||
fields.get("Test Group", ""),
|
||||
fields.get("Specimen", ""),
|
||||
fields.get("Test Description", ""),
|
||||
fields.get("Drug Name/Agent", ""),
|
||||
)
|
||||
prefix = "MIC"
|
||||
|
||||
occ[parts] = occ.get(parts, 0) + 1
|
||||
record_id = f"{prefix}|" + "|".join(str(p or "") for p in parts) + f"|{occ[parts]}"
|
||||
|
||||
extra = {
|
||||
"study": fmeta["study"],
|
||||
"site": fmeta["site"],
|
||||
"subject": _norm_subject(fields.get("Subject")),
|
||||
"resultType": rtype,
|
||||
}
|
||||
return record_id, extra
|
||||
|
||||
|
||||
def make_equery_record(fields: dict, fmeta: dict | None, occ: dict):
|
||||
"""Klíč = eQueryId. Footer s parametry filtru (nečíselný eQueryId) se přeskočí."""
|
||||
key_val = (fields.get("eQueryId") or "").strip()
|
||||
if not key_val.isdigit():
|
||||
return None, {}
|
||||
extra = {"study": fmeta["study"]} if fmeta else {}
|
||||
return key_val, extra
|
||||
|
||||
|
||||
def results_file_meta(filename: str) -> dict | None:
|
||||
m = re.search(r"study-(\d+)-test-results-(\d+)-(standard|microbiology)", filename, re.IGNORECASE)
|
||||
if not m:
|
||||
return None
|
||||
return {"study": m.group(1), "site": m.group(2), "resultType": m.group(3).lower()}
|
||||
|
||||
|
||||
def equery_file_meta(filename: str) -> dict | None:
|
||||
m = re.search(r"study-(\d+)-activity-reports", filename, re.IGNORECASE)
|
||||
return {"study": m.group(1)} if m else {"study": None}
|
||||
|
||||
|
||||
def kit_file_meta(filename: str) -> dict | None:
|
||||
m = re.search(r"study-(\d+)-kit-inventory", filename, re.IGNORECASE)
|
||||
return {"study": m.group(1)} if m else {"study": None}
|
||||
|
||||
|
||||
PIPELINES = [
|
||||
{
|
||||
"name": "allsamples",
|
||||
"collection": "allsamples",
|
||||
"pattern": re.compile(r".*-allSamples\.csv$", re.IGNORECASE),
|
||||
"sources": [SOURCE],
|
||||
"header_skip": 0,
|
||||
"make_record": make_keyed_record("Container Barcode No."),
|
||||
"file_meta": None,
|
||||
"indexes": [
|
||||
[("fields.Sample Status", ASCENDING)],
|
||||
[("fields.Specimen Type", ASCENDING)],
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "kits",
|
||||
"collection": "kits",
|
||||
"pattern": re.compile(r".*-kit-inventory-on-hand-expiration\.csv$", re.IGNORECASE),
|
||||
"sources": [SOURCE],
|
||||
"header_skip": 0,
|
||||
"make_record": make_keyed_record("Accession"),
|
||||
"file_meta": kit_file_meta,
|
||||
"indexes": [
|
||||
[("study", ASCENDING)],
|
||||
[("fields.Kit Type", ASCENDING)],
|
||||
[("fields.Site", ASCENDING)],
|
||||
[("fields.Expiration Date", ASCENDING)],
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "results",
|
||||
"collection": "results",
|
||||
"pattern": re.compile(r".*test-results-\d+-(standard|microbiology)\.csv$", re.IGNORECASE),
|
||||
"sources": [SOURCE],
|
||||
"header_skip": 1, # 1. řádek je disclaimer, hlavička je 2. řádek
|
||||
"make_record": make_results_record,
|
||||
"file_meta": results_file_meta,
|
||||
"indexes": [
|
||||
[("subject", ASCENDING)],
|
||||
[("study", ASCENDING)],
|
||||
[("site", ASCENDING)],
|
||||
[("resultType", ASCENDING)],
|
||||
[("fields.Accession", ASCENDING)],
|
||||
[("fields.Test Group", ASCENDING)],
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "equeries",
|
||||
"collection": "equeries",
|
||||
# FULL report; varianta _unresponded_only se sem ZÁMĚRNĚ nechytá (jiný pattern níže)
|
||||
"pattern": re.compile(r".*activity-reports-documents-equery\.csv$", re.IGNORECASE),
|
||||
"sources": [SOURCE],
|
||||
"header_skip": 0,
|
||||
"make_record": make_equery_record,
|
||||
"file_meta": equery_file_meta,
|
||||
"indexes": [
|
||||
[("study", ASCENDING)],
|
||||
[("fields.Status", ASCENDING)],
|
||||
[("fields.Site", ASCENDING)],
|
||||
[("fields.Subject", ASCENDING)],
|
||||
[("fields.Issue Type", ASCENDING)],
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "equeries_unresponded",
|
||||
"move_only": True, # podmnožina FULL reportu -> jen přesun, neimportuje se
|
||||
"pattern": re.compile(r".*activity-reports-documents-equery_unresponded_only\.csv$", re.IGNORECASE),
|
||||
"sources": [SOURCE],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def extract_snapshot_date(filename: str) -> str:
|
||||
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
|
||||
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def clean_value(val: str) -> str | None:
|
||||
val = val.strip()
|
||||
return val if val else None
|
||||
|
||||
|
||||
def import_file(csv_path: Path, collection, pipeline: dict, dry_run: bool) -> dict:
|
||||
snapshot_date = extract_snapshot_date(csv_path.name)
|
||||
inserted = changed = unchanged = skipped = 0
|
||||
|
||||
fmeta = pipeline["file_meta"](csv_path.name) if pipeline["file_meta"] else None
|
||||
|
||||
with open(csv_path, newline="", encoding="utf-8-sig") as f:
|
||||
lines = f.readlines()
|
||||
reader = csv.DictReader(lines[pipeline["header_skip"]:])
|
||||
rows = list(reader)
|
||||
|
||||
occ: dict = {} # stav pořadí výskytů (per soubor)
|
||||
|
||||
for row in rows:
|
||||
fields = {k: clean_value(v) for k, v in row.items() if k}
|
||||
|
||||
record_id, extra = pipeline["make_record"](fields, fmeta, occ)
|
||||
if not record_id:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
existing = None if dry_run else collection.find_one({"record_id": record_id})
|
||||
|
||||
if existing is None and dry_run:
|
||||
inserted += 1 # v dry-run nevíme jistě, počítáme jako kandidáty na insert
|
||||
continue
|
||||
|
||||
if existing is None:
|
||||
collection.insert_one({
|
||||
"record_id": record_id,
|
||||
"fields": fields,
|
||||
**extra,
|
||||
"sourceFile": csv_path.name,
|
||||
"firstSeen": snapshot_date,
|
||||
"lastSeen": snapshot_date,
|
||||
"history": [],
|
||||
})
|
||||
inserted += 1
|
||||
|
||||
elif existing["fields"] != fields:
|
||||
collection.update_one(
|
||||
{"_id": existing["_id"]},
|
||||
{
|
||||
"$push": {"history": {"date": existing["lastSeen"], "fields": existing["fields"]}},
|
||||
"$set": {"fields": fields, **extra, "sourceFile": csv_path.name, "lastSeen": snapshot_date},
|
||||
},
|
||||
)
|
||||
changed += 1
|
||||
|
||||
else:
|
||||
collection.update_one(
|
||||
{"_id": existing["_id"]},
|
||||
{"$set": {"lastSeen": snapshot_date, "sourceFile": csv_path.name}},
|
||||
)
|
||||
unchanged += 1
|
||||
|
||||
total_rows = len(rows)
|
||||
db_count = "-" if dry_run else collection.count_documents({})
|
||||
tag = "[DRY] " if dry_run else ""
|
||||
print(f" {tag}[{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{skipped} bez klice")
|
||||
print(f" Radku v CSV: {total_rows}, dokumentu v DB: {db_count}")
|
||||
|
||||
if inserted + changed + unchanged + skipped != total_rows:
|
||||
print(f" !!! VAROVANI: soucet ({inserted+changed+unchanged+skipped}) != radku v CSV ({total_rows})")
|
||||
|
||||
return {"inserted": inserted, "changed": changed, "unchanged": unchanged}
|
||||
|
||||
|
||||
def collect_files(pipeline: dict, cli_args: list[str]) -> list[Path]:
|
||||
if cli_args:
|
||||
paths = []
|
||||
for arg in cli_args:
|
||||
p = Path(arg)
|
||||
if p.is_file() and pipeline["pattern"].match(p.name):
|
||||
paths.append(p)
|
||||
return paths
|
||||
|
||||
paths = []
|
||||
for src_dir in pipeline["sources"]:
|
||||
if src_dir.exists():
|
||||
paths.extend(sorted(p for p in src_dir.glob("*.csv") if pipeline["pattern"].match(p.name)))
|
||||
return paths
|
||||
|
||||
|
||||
def move_to_processed(csv_path: Path, dry_run: bool):
|
||||
if dry_run:
|
||||
print(f" [DRY] -> presunul by do Zpracovano/\n")
|
||||
return
|
||||
dest = csv_path.parent / "Zpracovano" / csv_path.name
|
||||
shutil.move(str(csv_path), str(dest))
|
||||
print(f" -> presunut do Zpracovano/\n")
|
||||
|
||||
|
||||
def run_pipeline(pipeline: dict, client, cli_args: list[str], dry_run: bool):
|
||||
paths = collect_files(pipeline, cli_args)
|
||||
if not paths:
|
||||
print(f"[{pipeline['name']}] Zadne soubory k importu.")
|
||||
return
|
||||
|
||||
print(f"\n=== Pipeline: {pipeline['name']} ({len(paths)} souboru){' [DRY-RUN]' if dry_run else ''} ===")
|
||||
|
||||
# Move-only pipeline (např. unresponded podmnožina) — jen přesun, žádný import
|
||||
if pipeline.get("move_only"):
|
||||
if not dry_run:
|
||||
for src_dir in pipeline["sources"]:
|
||||
(src_dir / "Zpracovano").mkdir(exist_ok=True)
|
||||
for csv_path in paths:
|
||||
print(f"Move-only: {csv_path.name} [{csv_path.parent.parent.name}]")
|
||||
move_to_processed(csv_path, dry_run)
|
||||
print(f"[{pipeline['name']}] Presunuto {len(paths)} souboru (neimportuje se).")
|
||||
return
|
||||
|
||||
col = None
|
||||
if not dry_run:
|
||||
col = client[DB_NAME][pipeline["collection"]]
|
||||
col.create_index([("record_id", ASCENDING)], unique=True)
|
||||
for idx in pipeline["indexes"]:
|
||||
col.create_index(idx)
|
||||
for src_dir in pipeline["sources"]:
|
||||
(src_dir / "Zpracovano").mkdir(exist_ok=True)
|
||||
|
||||
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
||||
|
||||
for csv_path in paths:
|
||||
print(f"Import: {csv_path.name} [{csv_path.parent.parent.name}]")
|
||||
stats = import_file(csv_path, col, pipeline, dry_run)
|
||||
for k in total:
|
||||
total[k] += stats[k]
|
||||
move_to_processed(csv_path, dry_run)
|
||||
|
||||
print(f"[{pipeline['name']}] Celkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
|
||||
|
||||
|
||||
def main():
|
||||
args = sys.argv[1:]
|
||||
dry_run = "--dry-run" in args
|
||||
cli_args = [a for a in args if a != "--dry-run"]
|
||||
|
||||
client = None
|
||||
if not dry_run:
|
||||
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
||||
client.admin.command("ping")
|
||||
|
||||
for pipeline in PIPELINES:
|
||||
run_pipeline(pipeline, client, cli_args, dry_run)
|
||||
|
||||
if client:
|
||||
client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user