Files
janssen/IWRS/import_notifications_to_mongo.py
2026-06-10 11:59:03 +02:00

218 lines
6.9 KiB
Python

"""
Import IWRS notifikací (PDF + JSON metadata) přímo do MongoDB.
Nahrazuje původní 2-krokový flow:
1) import_to_mysql.py: PDF/JSON → MySQL iwrs_notifications
2) parse_notifications_to_mongo.py: MySQL → Mongo iwrs_notifications
Nyní vše v jednom: PDF/JSON → Mongo iwrs_notifications (text parsovaný per typ,
PDF jako BinData). Po úspěšném importu se soubory přesunou do Zpracováno/.
Idempotentní: _id = pk (IWRS unique identifier).
"""
import os
import sys
import re
import json
import shutil
import datetime
from bson.binary import Binary
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common.mongo_writer import get_db, to_date
from notification_parsers import (
parse_kv_lines, parse_medication_table, to_date as parse_to_date,
to_datetime as parse_to_datetime,
)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
def find_pairs(study):
"""Vrátí seznam (json_path, pdf_path) — i když pdf neexistuje."""
out_dir = os.path.join(DETAILS_DIR, study)
if not os.path.isdir(out_dir):
return []
pairs = []
import glob
for jp in sorted(glob.glob(os.path.join(out_dir, "*.json"))):
pp = jp.replace(".json", ".pdf")
pairs.append((jp, pp if os.path.exists(pp) else None))
return pairs
def build_document(meta, pdf_bytes):
text = meta.get("text") or ""
kv = parse_kv_lines(text)
meds = parse_medication_table(text)
dt_site = parse_to_datetime(kv.get("Transaction Date/Time (site local)"))
dt_sys = parse_to_datetime(kv.get("Transaction Date/Time (system local)"))
date_fields = [
"Informed Consent Date",
"Informed Consent Date at Screening",
"Informed Consent Date at Subject Creation",
"Date of Subject Creation in IRT",
"Date of Screening in IRT",
"Screenfail Date",
"Discontinuation date",
"Dispensation date",
"Returned Date",
]
parsed_dates = {}
for f in date_fields:
if f in kv:
d = parse_to_date(kv[f])
if d:
parsed_dates[f] = d
pk = meta.get("pk")
actual_date = to_date(meta.get("actual_date"))
doc = {
"_id": pk,
"pk": pk,
"study": meta.get("study"),
"subject": meta.get("subject"),
"title": meta.get("title"),
"label": meta.get("label"),
"event": meta.get("event"),
"actual_date": actual_date,
"site": kv.get("Site"),
"investigator": kv.get("Investigator"),
"location": kv.get("Location"),
"cohort": kv.get("Cohort"),
"irt_subject_status": kv.get("IRT Subject Status"),
"transaction_site_local": dt_site,
"transaction_system_local": dt_sys,
"transaction_by": kv.get("Transaction performed by"),
"medications": meds,
"fields": {k: v for k, v in kv.items() if k not in {
"Site", "Investigator", "Location", "Cohort", "IRT Subject Status",
"Subject",
"Transaction Date/Time (site local)",
"Transaction Date/Time (system local)",
"Transaction performed by",
}},
"parsed_dates": parsed_dates,
"raw_text": text,
}
if pdf_bytes is not None:
doc["pdf"] = Binary(pdf_bytes)
return doc
def import_study(study):
pairs = find_pairs(study)
if not pairs:
print(f" [{study}] zadne nove notifikace")
return 0
db = get_db()
coll = db.iwrs_notifications
done_dir = os.path.join(DETAILS_DIR, study, "Zpracovano")
os.makedirs(done_dir, exist_ok=True)
imported = 0
failed = 0
for json_path, pdf_path in pairs:
try:
with open(json_path, "r", encoding="utf-8") as f:
meta = json.load(f)
pdf_bytes = None
if pdf_path:
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
if not meta.get("pk"):
print(f" CHYBI pk: {os.path.basename(json_path)}")
failed += 1
continue
doc = build_document(meta, pdf_bytes)
doc["last_imported_at"] = datetime.datetime.now()
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
imported += 1
# presun zpracovany pair
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
if pdf_path:
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
except Exception as e:
print(f" CHYBA {os.path.basename(json_path)}: {e}")
failed += 1
print(f" [{study}] importovano={imported} selhalo={failed}")
return imported
def import_from_dir(incoming_dir, done_dir, studies=None):
"""
Plochý layout: *.json + odpovídající *.pdf leží v incoming_dir.
Study se bere z meta JSON. Po importu se pár přesune do done_dir.
"""
studies = set(studies) if studies else None
if not os.path.isdir(incoming_dir):
print(f" Incoming neexistuje: {incoming_dir}")
return 0
os.makedirs(done_dir, exist_ok=True)
import glob
json_paths = sorted(
glob.glob(os.path.join(incoming_dir, "*.json")),
key=os.path.getmtime,
)
if not json_paths:
print(" [notifikace] zadne nove (Incoming prazdny)")
return 0
db = get_db()
coll = db.iwrs_notifications
imported = 0
skipped = 0
failed = 0
for json_path in json_paths:
try:
with open(json_path, "r", encoding="utf-8") as f:
meta = json.load(f)
if studies and meta.get("study") not in studies:
skipped += 1
continue
pdf_path = json_path[:-5] + ".pdf"
pdf_bytes = None
if os.path.exists(pdf_path):
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
if not meta.get("pk"):
print(f" CHYBI pk: {os.path.basename(json_path)}")
failed += 1
continue
doc = build_document(meta, pdf_bytes)
doc["last_imported_at"] = datetime.datetime.now()
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
imported += 1
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
if os.path.exists(pdf_path):
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
except Exception as e:
print(f" CHYBA {os.path.basename(json_path)}: {e}")
failed += 1
print(f" [notifikace] importovano={imported} preskoceno={skipped} selhalo={failed}")
return imported
def main(studies=None):
studies = studies or STUDIES
for s in studies:
import_study(s)
if __name__ == "__main__":
main(sys.argv[1:] or None)