z230
This commit is contained in:
@@ -0,0 +1,217 @@
|
||||
"""
|
||||
Import IWRS notifikací (PDF + JSON metadata) přímo do MongoDB.
|
||||
|
||||
Nahrazuje původní 2-krokový flow:
|
||||
1) import_to_mysql.py: PDF/JSON → MySQL iwrs_notifications
|
||||
2) parse_notifications_to_mongo.py: MySQL → Mongo iwrs_notifications
|
||||
|
||||
Nyní vše v jednom: PDF/JSON → Mongo iwrs_notifications (text parsovaný per typ,
|
||||
PDF jako BinData). Po úspěšném importu se soubory přesunou do Zpracováno/.
|
||||
|
||||
Idempotentní: _id = pk (IWRS unique identifier).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import json
|
||||
import shutil
|
||||
import datetime
|
||||
|
||||
from bson.binary import Binary
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from common.mongo_writer import get_db, to_date
|
||||
|
||||
from notification_parsers import (
|
||||
parse_kv_lines, parse_medication_table, to_date as parse_to_date,
|
||||
to_datetime as parse_to_datetime,
|
||||
)
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
DETAILS_DIR = os.path.join(BASE_DIR, "IncomingSourceReportsDetails")
|
||||
STUDIES = ["77242113UCO3001", "42847922MDD3003"]
|
||||
|
||||
|
||||
def find_pairs(study):
|
||||
"""Vrátí seznam (json_path, pdf_path) — i když pdf neexistuje."""
|
||||
out_dir = os.path.join(DETAILS_DIR, study)
|
||||
if not os.path.isdir(out_dir):
|
||||
return []
|
||||
pairs = []
|
||||
import glob
|
||||
for jp in sorted(glob.glob(os.path.join(out_dir, "*.json"))):
|
||||
pp = jp.replace(".json", ".pdf")
|
||||
pairs.append((jp, pp if os.path.exists(pp) else None))
|
||||
return pairs
|
||||
|
||||
|
||||
def build_document(meta, pdf_bytes):
|
||||
text = meta.get("text") or ""
|
||||
kv = parse_kv_lines(text)
|
||||
meds = parse_medication_table(text)
|
||||
|
||||
dt_site = parse_to_datetime(kv.get("Transaction Date/Time (site local)"))
|
||||
dt_sys = parse_to_datetime(kv.get("Transaction Date/Time (system local)"))
|
||||
|
||||
date_fields = [
|
||||
"Informed Consent Date",
|
||||
"Informed Consent Date at Screening",
|
||||
"Informed Consent Date at Subject Creation",
|
||||
"Date of Subject Creation in IRT",
|
||||
"Date of Screening in IRT",
|
||||
"Screenfail Date",
|
||||
"Discontinuation date",
|
||||
"Dispensation date",
|
||||
"Returned Date",
|
||||
]
|
||||
parsed_dates = {}
|
||||
for f in date_fields:
|
||||
if f in kv:
|
||||
d = parse_to_date(kv[f])
|
||||
if d:
|
||||
parsed_dates[f] = d
|
||||
|
||||
pk = meta.get("pk")
|
||||
actual_date = to_date(meta.get("actual_date"))
|
||||
|
||||
doc = {
|
||||
"_id": pk,
|
||||
"pk": pk,
|
||||
"study": meta.get("study"),
|
||||
"subject": meta.get("subject"),
|
||||
"title": meta.get("title"),
|
||||
"label": meta.get("label"),
|
||||
"event": meta.get("event"),
|
||||
"actual_date": actual_date,
|
||||
"site": kv.get("Site"),
|
||||
"investigator": kv.get("Investigator"),
|
||||
"location": kv.get("Location"),
|
||||
"cohort": kv.get("Cohort"),
|
||||
"irt_subject_status": kv.get("IRT Subject Status"),
|
||||
"transaction_site_local": dt_site,
|
||||
"transaction_system_local": dt_sys,
|
||||
"transaction_by": kv.get("Transaction performed by"),
|
||||
"medications": meds,
|
||||
"fields": {k: v for k, v in kv.items() if k not in {
|
||||
"Site", "Investigator", "Location", "Cohort", "IRT Subject Status",
|
||||
"Subject",
|
||||
"Transaction Date/Time (site local)",
|
||||
"Transaction Date/Time (system local)",
|
||||
"Transaction performed by",
|
||||
}},
|
||||
"parsed_dates": parsed_dates,
|
||||
"raw_text": text,
|
||||
}
|
||||
if pdf_bytes is not None:
|
||||
doc["pdf"] = Binary(pdf_bytes)
|
||||
return doc
|
||||
|
||||
|
||||
def import_study(study):
|
||||
pairs = find_pairs(study)
|
||||
if not pairs:
|
||||
print(f" [{study}] zadne nove notifikace")
|
||||
return 0
|
||||
|
||||
db = get_db()
|
||||
coll = db.iwrs_notifications
|
||||
done_dir = os.path.join(DETAILS_DIR, study, "Zpracovano")
|
||||
os.makedirs(done_dir, exist_ok=True)
|
||||
|
||||
imported = 0
|
||||
failed = 0
|
||||
for json_path, pdf_path in pairs:
|
||||
try:
|
||||
with open(json_path, "r", encoding="utf-8") as f:
|
||||
meta = json.load(f)
|
||||
pdf_bytes = None
|
||||
if pdf_path:
|
||||
with open(pdf_path, "rb") as f:
|
||||
pdf_bytes = f.read()
|
||||
|
||||
if not meta.get("pk"):
|
||||
print(f" CHYBI pk: {os.path.basename(json_path)}")
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
doc = build_document(meta, pdf_bytes)
|
||||
doc["last_imported_at"] = datetime.datetime.now()
|
||||
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
|
||||
imported += 1
|
||||
|
||||
# presun zpracovany pair
|
||||
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
|
||||
if pdf_path:
|
||||
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
|
||||
except Exception as e:
|
||||
print(f" CHYBA {os.path.basename(json_path)}: {e}")
|
||||
failed += 1
|
||||
|
||||
print(f" [{study}] importovano={imported} selhalo={failed}")
|
||||
return imported
|
||||
|
||||
|
||||
def import_from_dir(incoming_dir, done_dir, studies=None):
|
||||
"""
|
||||
Plochý layout: *.json + odpovídající *.pdf leží v incoming_dir.
|
||||
Study se bere z meta JSON. Po importu se pár přesune do done_dir.
|
||||
"""
|
||||
studies = set(studies) if studies else None
|
||||
if not os.path.isdir(incoming_dir):
|
||||
print(f" Incoming neexistuje: {incoming_dir}")
|
||||
return 0
|
||||
os.makedirs(done_dir, exist_ok=True)
|
||||
|
||||
import glob
|
||||
json_paths = sorted(
|
||||
glob.glob(os.path.join(incoming_dir, "*.json")),
|
||||
key=os.path.getmtime,
|
||||
)
|
||||
if not json_paths:
|
||||
print(" [notifikace] zadne nove (Incoming prazdny)")
|
||||
return 0
|
||||
|
||||
db = get_db()
|
||||
coll = db.iwrs_notifications
|
||||
imported = 0
|
||||
skipped = 0
|
||||
failed = 0
|
||||
for json_path in json_paths:
|
||||
try:
|
||||
with open(json_path, "r", encoding="utf-8") as f:
|
||||
meta = json.load(f)
|
||||
if studies and meta.get("study") not in studies:
|
||||
skipped += 1
|
||||
continue
|
||||
pdf_path = json_path[:-5] + ".pdf"
|
||||
pdf_bytes = None
|
||||
if os.path.exists(pdf_path):
|
||||
with open(pdf_path, "rb") as f:
|
||||
pdf_bytes = f.read()
|
||||
if not meta.get("pk"):
|
||||
print(f" CHYBI pk: {os.path.basename(json_path)}")
|
||||
failed += 1
|
||||
continue
|
||||
doc = build_document(meta, pdf_bytes)
|
||||
doc["last_imported_at"] = datetime.datetime.now()
|
||||
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
|
||||
imported += 1
|
||||
shutil.move(json_path, os.path.join(done_dir, os.path.basename(json_path)))
|
||||
if os.path.exists(pdf_path):
|
||||
shutil.move(pdf_path, os.path.join(done_dir, os.path.basename(pdf_path)))
|
||||
except Exception as e:
|
||||
print(f" CHYBA {os.path.basename(json_path)}: {e}")
|
||||
failed += 1
|
||||
print(f" [notifikace] importovano={imported} preskoceno={skipped} selhalo={failed}")
|
||||
return imported
|
||||
|
||||
|
||||
def main(studies=None):
|
||||
studies = studies or STUDIES
|
||||
for s in studies:
|
||||
import_study(s)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1:] or None)
|
||||
Reference in New Issue
Block a user