218 lines
6.9 KiB
Python
218 lines
6.9 KiB
Python
"""
|
|
Import IWRS notifikací (PDF + JSON metadata) přímo do MongoDB.
|
|
|
|
Nahrazuje původní 2-krokový flow:
|
|
1) import_to_mysql.py: PDF/JSON → MySQL iwrs_notifications
|
|
2) parse_notifications_to_mongo.py: MySQL → Mongo iwrs_notifications
|
|
|
|
Nyní vše v jednom: PDF/JSON → Mongo iwrs_notifications (text parsovaný per typ,
|
|
PDF jako BinData). Po úspěšném importu se soubory přesunou do Zpracováno/.
|
|
|
|
Idempotentní: _id = pk (IWRS unique identifier).
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import json
|
|
import shutil
|
|
import datetime
|
|
|
|
from bson.binary import Binary
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from common.mongo_writer import get_db, to_date
|
|
|
|
from notification_parsers import (
|
|
parse_kv_lines, parse_medication_table, to_date as parse_to_date,
|
|
to_datetime as parse_to_datetime,
|
|
)
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
|
|
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
|
|
|
|
|
|
def find_pairs(study):
|
|
"""Vrátí seznam (json_path, pdf_path) — i když pdf neexistuje."""
|
|
out_dir = os.path.join(DETAILS_DIR, study)
|
|
if not os.path.isdir(out_dir):
|
|
return []
|
|
pairs = []
|
|
import glob
|
|
for jp in sorted(glob.glob(os.path.join(out_dir, "*.json"))):
|
|
pp = jp.replace(".json", ".pdf")
|
|
pairs.append((jp, pp if os.path.exists(pp) else None))
|
|
return pairs
|
|
|
|
|
|
def build_document(meta, pdf_bytes):
|
|
text = meta.get("text") or ""
|
|
kv = parse_kv_lines(text)
|
|
meds = parse_medication_table(text)
|
|
|
|
dt_site = parse_to_datetime(kv.get("Transaction Date/Time (site local)"))
|
|
dt_sys = parse_to_datetime(kv.get("Transaction Date/Time (system local)"))
|
|
|
|
date_fields = [
|
|
"Informed Consent Date",
|
|
"Informed Consent Date at Screening",
|
|
"Informed Consent Date at Subject Creation",
|
|
"Date of Subject Creation in IRT",
|
|
"Date of Screening in IRT",
|
|
"Screenfail Date",
|
|
"Discontinuation date",
|
|
"Dispensation date",
|
|
"Returned Date",
|
|
]
|
|
parsed_dates = {}
|
|
for f in date_fields:
|
|
if f in kv:
|
|
d = parse_to_date(kv[f])
|
|
if d:
|
|
parsed_dates[f] = d
|
|
|
|
pk = meta.get("pk")
|
|
actual_date = to_date(meta.get("actual_date"))
|
|
|
|
doc = {
|
|
"_id": pk,
|
|
"pk": pk,
|
|
"study": meta.get("study"),
|
|
"subject": meta.get("subject"),
|
|
"title": meta.get("title"),
|
|
"label": meta.get("label"),
|
|
"event": meta.get("event"),
|
|
"actual_date": actual_date,
|
|
"site": kv.get("Site"),
|
|
"investigator": kv.get("Investigator"),
|
|
"location": kv.get("Location"),
|
|
"cohort": kv.get("Cohort"),
|
|
"irt_subject_status": kv.get("IRT Subject Status"),
|
|
"transaction_site_local": dt_site,
|
|
"transaction_system_local": dt_sys,
|
|
"transaction_by": kv.get("Transaction performed by"),
|
|
"medications": meds,
|
|
"fields": {k: v for k, v in kv.items() if k not in {
|
|
"Site", "Investigator", "Location", "Cohort", "IRT Subject Status",
|
|
"Subject",
|
|
"Transaction Date/Time (site local)",
|
|
"Transaction Date/Time (system local)",
|
|
"Transaction performed by",
|
|
}},
|
|
"parsed_dates": parsed_dates,
|
|
"raw_text": text,
|
|
}
|
|
if pdf_bytes is not None:
|
|
doc["pdf"] = Binary(pdf_bytes)
|
|
return doc
|
|
|
|
|
|
def import_study(study):
|
|
pairs = find_pairs(study)
|
|
if not pairs:
|
|
print(f" [{study}] zadne nove notifikace")
|
|
return 0
|
|
|
|
db = get_db()
|
|
coll = db.iwrs_notifications
|
|
done_dir = os.path.join(DETAILS_DIR, study, "Zpracovano")
|
|
os.makedirs(done_dir, exist_ok=True)
|
|
|
|
imported = 0
|
|
failed = 0
|
|
for json_path, pdf_path in pairs:
|
|
try:
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
pdf_bytes = None
|
|
if pdf_path:
|
|
with open(pdf_path, "rb") as f:
|
|
pdf_bytes = f.read()
|
|
|
|
if not meta.get("pk"):
|
|
print(f" CHYBI pk: {os.path.basename(json_path)}")
|
|
failed += 1
|
|
continue
|
|
|
|
doc = build_document(meta, pdf_bytes)
|
|
doc["last_imported_at"] = datetime.datetime.now()
|
|
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
|
|
imported += 1
|
|
|
|
# presun zpracovany pair
|
|
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
|
|
if pdf_path:
|
|
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
|
|
except Exception as e:
|
|
print(f" CHYBA {os.path.basename(json_path)}: {e}")
|
|
failed += 1
|
|
|
|
print(f" [{study}] importovano={imported} selhalo={failed}")
|
|
return imported
|
|
|
|
|
|
def import_from_dir(incoming_dir, done_dir, studies=None):
|
|
"""
|
|
Plochý layout: *.json + odpovídající *.pdf leží v incoming_dir.
|
|
Study se bere z meta JSON. Po importu se pár přesune do done_dir.
|
|
"""
|
|
studies = set(studies) if studies else None
|
|
if not os.path.isdir(incoming_dir):
|
|
print(f" Incoming neexistuje: {incoming_dir}")
|
|
return 0
|
|
os.makedirs(done_dir, exist_ok=True)
|
|
|
|
import glob
|
|
json_paths = sorted(
|
|
glob.glob(os.path.join(incoming_dir, "*.json")),
|
|
key=os.path.getmtime,
|
|
)
|
|
if not json_paths:
|
|
print(" [notifikace] zadne nove (Incoming prazdny)")
|
|
return 0
|
|
|
|
db = get_db()
|
|
coll = db.iwrs_notifications
|
|
imported = 0
|
|
skipped = 0
|
|
failed = 0
|
|
for json_path in json_paths:
|
|
try:
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
if studies and meta.get("study") not in studies:
|
|
skipped += 1
|
|
continue
|
|
pdf_path = json_path[:-5] + ".pdf"
|
|
pdf_bytes = None
|
|
if os.path.exists(pdf_path):
|
|
with open(pdf_path, "rb") as f:
|
|
pdf_bytes = f.read()
|
|
if not meta.get("pk"):
|
|
print(f" CHYBI pk: {os.path.basename(json_path)}")
|
|
failed += 1
|
|
continue
|
|
doc = build_document(meta, pdf_bytes)
|
|
doc["last_imported_at"] = datetime.datetime.now()
|
|
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
|
|
imported += 1
|
|
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
|
|
if os.path.exists(pdf_path):
|
|
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
|
|
except Exception as e:
|
|
print(f" CHYBA {os.path.basename(json_path)}: {e}")
|
|
failed += 1
|
|
print(f" [notifikace] importovano={imported} preskoceno={skipped} selhalo={failed}")
|
|
return imported
|
|
|
|
|
|
def main(studies=None):
|
|
studies = studies or STUDIES
|
|
for s in studies:
|
|
import_study(s)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:] or None)
|