182 lines
5.9 KiB
Python
182 lines
5.9 KiB
Python
# =============================================================================
|
|
# Název: import_to_mongo_v1.2.py
|
|
# Verze: 1.2
|
|
# Datum: 2026-05-28
|
|
# Popis: Import CSV reportů do MongoDB (db: covance).
|
|
# Pipeline 1 — allSamples: kolekce allsamples, klíč Container Barcode No.
|
|
# Zdroj: UCO3001/Source + MDD3003/Source
|
|
# Pipeline 2 — kits: kolekce kits, klíč Accession
|
|
# Zdroj: UCO3001/Source (oba study 36940 + 35472)
|
|
# Upsert s historií změn, zpracovaný soubor přesunut do Zpracovano/.
|
|
# =============================================================================
|
|
|
|
import csv
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
DB_NAME = "covance"
|
|
|
|
UCO3001_SOURCE = Path(__file__).parent / "Source"
|
|
MDD3003_SOURCE = Path(__file__).parent.parent / "Covance_MDD3003" / "Source"
|
|
|
|
PIPELINES = [
|
|
{
|
|
"name": "allsamples",
|
|
"collection": "allsamples",
|
|
"upsert_key": "Container Barcode No.",
|
|
"pattern": re.compile(r".*-allSamples\.csv$", re.IGNORECASE),
|
|
"sources": [UCO3001_SOURCE, MDD3003_SOURCE],
|
|
"indexes": [
|
|
[("fields.Sample Status", ASCENDING)],
|
|
[("fields.Specimen Type", ASCENDING)],
|
|
],
|
|
},
|
|
{
|
|
"name": "kits",
|
|
"collection": "kits",
|
|
"upsert_key": "Accession",
|
|
"pattern": re.compile(r".*-kit-inventory-on-hand-expiration\.csv$", re.IGNORECASE),
|
|
"sources": [UCO3001_SOURCE],
|
|
"indexes": [
|
|
[("fields.Kit Type", ASCENDING)],
|
|
[("fields.Site", ASCENDING)],
|
|
[("fields.Expiration Date", ASCENDING)],
|
|
],
|
|
},
|
|
]
|
|
|
|
|
|
def extract_snapshot_date(filename: str) -> str:
|
|
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
|
|
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
def clean_value(val: str) -> str | None:
|
|
val = val.strip()
|
|
return val if val else None
|
|
|
|
|
|
def import_file(csv_path: Path, collection, upsert_key: str) -> dict:
|
|
snapshot_date = extract_snapshot_date(csv_path.name)
|
|
inserted = changed = unchanged = skipped = 0
|
|
|
|
with open(csv_path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
|
|
for row in rows:
|
|
fields = {k: clean_value(v) for k, v in row.items() if k}
|
|
|
|
key_val = fields.get(upsert_key)
|
|
if not key_val:
|
|
skipped += 1
|
|
continue
|
|
|
|
existing = collection.find_one({"record_id": key_val})
|
|
|
|
if existing is None:
|
|
collection.insert_one({
|
|
"record_id": key_val,
|
|
"fields": fields,
|
|
"sourceFile": csv_path.name,
|
|
"firstSeen": snapshot_date,
|
|
"lastSeen": snapshot_date,
|
|
"history": [],
|
|
})
|
|
inserted += 1
|
|
|
|
elif existing["fields"] != fields:
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{
|
|
"$push": {"history": {"date": existing["lastSeen"], "fields": existing["fields"]}},
|
|
"$set": {"fields": fields, "sourceFile": csv_path.name, "lastSeen": snapshot_date},
|
|
},
|
|
)
|
|
changed += 1
|
|
|
|
else:
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{"$set": {"lastSeen": snapshot_date, "sourceFile": csv_path.name}},
|
|
)
|
|
unchanged += 1
|
|
|
|
total_rows = len(rows)
|
|
db_count = collection.count_documents({})
|
|
print(f" [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{skipped} bez klice")
|
|
print(f" Radku v CSV: {total_rows}, dokumentu v DB: {db_count}")
|
|
|
|
if inserted + changed + unchanged + skipped != total_rows:
|
|
print(f" !!! VAROVANI: soucet ({inserted+changed+unchanged+skipped}) != radku v CSV ({total_rows})")
|
|
|
|
return {"inserted": inserted, "changed": changed, "unchanged": unchanged}
|
|
|
|
|
|
def collect_files(pipeline: dict, cli_args: list[str]) -> list[Path]:
|
|
if cli_args:
|
|
paths = []
|
|
for arg in cli_args:
|
|
p = Path(arg)
|
|
if p.is_file() and pipeline["pattern"].match(p.name):
|
|
paths.append(p)
|
|
return paths
|
|
|
|
paths = []
|
|
for src_dir in pipeline["sources"]:
|
|
if src_dir.exists():
|
|
paths.extend(sorted(p for p in src_dir.glob("*.csv") if pipeline["pattern"].match(p.name)))
|
|
return paths
|
|
|
|
|
|
def run_pipeline(pipeline: dict, client, cli_args: list[str]):
|
|
paths = collect_files(pipeline, cli_args)
|
|
if not paths:
|
|
print(f"[{pipeline['name']}] Zadne soubory k importu.")
|
|
return
|
|
|
|
print(f"\n=== Pipeline: {pipeline['name']} ({len(paths)} souboru) ===")
|
|
|
|
col = client[DB_NAME][pipeline["collection"]]
|
|
col.create_index([("record_id", ASCENDING)], unique=True)
|
|
for idx in pipeline["indexes"]:
|
|
col.create_index(idx)
|
|
|
|
for src_dir in pipeline["sources"]:
|
|
(src_dir / "Zpracovano").mkdir(exist_ok=True)
|
|
|
|
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
|
|
|
for csv_path in paths:
|
|
print(f"Import: {csv_path.name} [{csv_path.parent.parent.name}]")
|
|
stats = import_file(csv_path, col, pipeline["upsert_key"])
|
|
for k in total:
|
|
total[k] += stats[k]
|
|
dest = csv_path.parent / "Zpracovano" / csv_path.name
|
|
shutil.move(str(csv_path), str(dest))
|
|
print(f" -> presunut do Zpracovano/\n")
|
|
|
|
print(f"[{pipeline['name']}] Celkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
|
|
|
|
|
|
def main():
|
|
cli_args = sys.argv[1:]
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
|
|
for pipeline in PIPELINES:
|
|
run_pipeline(pipeline, client, cli_args)
|
|
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|