Files
janssen/IWRS/Patients/parse_notifications_to_mongo.py
T
administrator ea9d611719 Migrate IWRS from MySQL to MongoDB
- Add IWRS/common/mongo_writer.py with shared connection, indexes,
  upsert+snapshot helpers
- Add IWRS/Patients/import_to_mongo.py (subject_summary + visits)
- Add IWRS/Patients/import_notifications_to_mongo.py: parse PDF/JSON
  directly to Mongo (incl. PDF as BinData), replaces 2-step MySQL flow
- Add IWRS/Drugs/import_to_mongo.py (shipments, items, inventory,
  destruction)
- Add IWRS/backfill_mysql_to_mongo.py: one-shot history backfill
- Switch IWRS/Patients/run_all.py and IWRS/Drugs/run_all.py to Mongo
- Rewrite IWRS/Drugs/create_report.py data loaders to read from Mongo
- 8 main collections (upsert = latest state) + 5 snapshot collections
  (append-only with import_id) under studie database; notifications and
  destruction are immutable and need no snapshots

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 07:24:36 +02:00

247 lines
7.8 KiB
Python

"""
Parsuje texty IWRS notifikací z MySQL (iwrs_notifications) a ukládá strukturovaná
data do MongoDB (databáze 'studie', kolekce 'iwrs').
Idempotentní: upsert podle pk (unikátní identifikátor notifikace v IWRS).
"""
import re
import datetime
import sys
import mysql.connector
from pymongo import MongoClient, ASCENDING
import db_config
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "studie"
MONGO_COLL = "iwrs_notifications"
# ── parsery ──────────────────────────────────────────────────────────────────
def parse_kv_lines(text):
"""Vytáhne všechny řádky typu 'Klíč: Hodnota' do dictu.
Když je hodnota za dvojtečkou prázdná, vezme se první neprázdný následující řádek."""
out = {}
lines = [l.strip() for l in text.splitlines()]
pending_key = None
for line in lines:
# čekáme na hodnotu pro klíč z předchozího řádku
if pending_key is not None:
if not line:
continue
if ":" not in line:
out.setdefault(pending_key, line)
pending_key = None
continue
# další řádek je sám "Klíč: Hodnota" → zahodíme pending a zpracujeme normálně
pending_key = None
if not line or ":" not in line:
continue
if line.lower().startswith("http"):
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip()
if not key or (" " in key and len(key.split()) > 8):
continue
if not val:
pending_key = key
continue
out.setdefault(key, val)
return out
DATE_RE = re.compile(r"^\d{2}-[A-Z][a-z]{2}-\d{4}$")
DATETIME_RE = re.compile(r"^(\d{2}-[A-Z][a-z]{2}-\d{4})\s+(\d{2}:\d{2}:\d{2})$")
def to_date(s):
if not s:
return None
s = s.strip()
if DATE_RE.match(s):
try:
return datetime.datetime.strptime(s, "%d-%b-%Y")
except ValueError:
return None
return None
def to_datetime(s):
if not s:
return None
s = re.sub(r"\s+", " ", s.strip())
m = DATETIME_RE.match(s)
if m:
try:
return datetime.datetime.strptime(f"{m.group(1)} {m.group(2)}", "%d-%b-%Y %H:%M:%S")
except ValueError:
return None
return None
MED_ROW_RE = re.compile(
r"(?P<no>\d{7})\s*[\s\n]*"
r"(?P<type>[A-Za-z][A-Za-z0-9 /+\-]+?)\s*[\s\n]*"
r"(?P<lot>[A-Z0-9]{5,10})\s*[\s\n]*"
r"(?P<exp>\d{2}-[A-Z][a-z]{2}-\d{4})"
)
def parse_medication_table(text):
"""Najde záznamy medikace (med_no, med_type, lot, expirace) v textu.
Pracuje s oběma formáty (UCO3001 multiline i MDD3003 concatenated)."""
rows = []
# zkomprimuj whitespace pro snadnější regex
compact = re.sub(r"\s+", " ", text)
for m in MED_ROW_RE.finditer(compact):
med_type = m.group("type").strip()
# uřízni nadbytečné koncové fragmenty
med_type = re.sub(r"\s+(Packaged|Lot|Expiration|No|Date|Medication).*$", "", med_type).strip()
rows.append({
"medication_no": m.group("no"),
"medication_type": med_type,
"lot_no": m.group("lot"),
"expiration_date": to_date(m.group("exp")),
})
# dedupe
seen = set()
unique = []
for r in rows:
key = (r["medication_no"], r["lot_no"])
if key in seen:
continue
seen.add(key)
unique.append(r)
return unique
# fields, které chceme v dokumentu vyloučit z kv (ošklivé / nepotřebné)
KV_BLACKLIST = {
"If you have questions about this notification, please contact 4G Clinical Support at",
}
def build_document(row):
pk, study, subject, title, label, event, actual_date, text = row
kv = parse_kv_lines(text)
meds = parse_medication_table(text)
# převod známých datumových/datetime polí
dt_site = to_datetime(kv.get("Transaction Date/Time (site local)"))
dt_sys = to_datetime(kv.get("Transaction Date/Time (system local)"))
date_fields = [
"Informed Consent Date",
"Informed Consent Date at Screening",
"Informed Consent Date at Subject Creation",
"Date of Subject Creation in IRT",
"Date of Screening in IRT",
"Screenfail Date",
"Discontinuation date",
"Dispensation date",
"Returned Date",
]
parsed_dates = {}
for f in date_fields:
if f in kv:
d = to_date(kv[f])
if d:
parsed_dates[f] = d
doc = {
"_id": pk, # použij IWRS pk jako _id (idempotence)
"pk": pk,
"study": study,
"subject": subject,
"title": title,
"label": label,
"event": event,
"actual_date": (
datetime.datetime.combine(actual_date, datetime.time())
if isinstance(actual_date, datetime.date) and not isinstance(actual_date, datetime.datetime)
else actual_date
),
"site": kv.get("Site"),
"investigator": kv.get("Investigator"),
"location": kv.get("Location"),
"cohort": kv.get("Cohort"),
"irt_subject_status": kv.get("IRT Subject Status"),
"transaction_site_local": dt_site,
"transaction_system_local": dt_sys,
"transaction_by": kv.get("Transaction performed by"),
"medications": meds,
"fields": {k: v for k, v in kv.items() if k not in {
"Site", "Investigator", "Location", "Cohort", "IRT Subject Status",
"Subject",
"Transaction Date/Time (site local)",
"Transaction Date/Time (system local)",
"Transaction performed by",
}},
"parsed_dates": parsed_dates,
"raw_text": text,
}
return doc
# ── main ─────────────────────────────────────────────────────────────────────
def main(studies=None):
conn = mysql.connector.connect(
host=db_config.DB_HOST, port=db_config.DB_PORT,
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
database=db_config.DB_NAME,
)
cur = conn.cursor()
if studies:
placeholders = ",".join(["%s"] * len(studies))
cur.execute(
f"SELECT pk, study, subject, title, label, event, actual_date, text "
f"FROM iwrs_notifications WHERE study IN ({placeholders})",
studies,
)
else:
cur.execute(
"SELECT pk, study, subject, title, label, event, actual_date, text "
"FROM iwrs_notifications"
)
rows = cur.fetchall()
cur.close()
conn.close()
print(f" Nacteno {len(rows)} notifikaci z MySQL")
mc = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
coll = mc[MONGO_DB][MONGO_COLL]
# indexy
coll.create_index([("study", ASCENDING), ("subject", ASCENDING)])
coll.create_index([("study", ASCENDING), ("title", ASCENDING)])
coll.create_index([("actual_date", ASCENDING)])
upserts = 0
for row in rows:
doc = build_document(row)
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
upserts += 1
print(f" Upsert {upserts} dokumentu do {MONGO_DB}.{MONGO_COLL}")
# stats
print("\n Statistika v Mongo:")
for r in coll.aggregate([
{"$group": {"_id": {"study": "$study", "title": "$title"}, "count": {"$sum": 1}}},
{"$sort": {"_id.study": 1, "_id.title": 1}},
]):
print(f" {r['_id']['study']} | {r['_id']['title']:30s} | {r['count']}")
if __name__ == "__main__":
studies = sys.argv[1:] if len(sys.argv) > 1 else None
main(studies)