ea9d611719
- Add IWRS/common/mongo_writer.py with shared connection, indexes, upsert+snapshot helpers - Add IWRS/Patients/import_to_mongo.py (subject_summary + visits) - Add IWRS/Patients/import_notifications_to_mongo.py: parse PDF/JSON directly to Mongo (incl. PDF as BinData), replaces 2-step MySQL flow - Add IWRS/Drugs/import_to_mongo.py (shipments, items, inventory, destruction) - Add IWRS/backfill_mysql_to_mongo.py: one-shot history backfill - Switch IWRS/Patients/run_all.py and IWRS/Drugs/run_all.py to Mongo - Rewrite IWRS/Drugs/create_report.py data loaders to read from Mongo - 8 main collections (upsert = latest state) + 5 snapshot collections (append-only with import_id) under studie database; notifications and destruction are immutable and need no snapshots Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
247 lines
7.8 KiB
Python
247 lines
7.8 KiB
Python
"""
|
|
Parsuje texty IWRS notifikací z MySQL (iwrs_notifications) a ukládá strukturovaná
|
|
data do MongoDB (databáze 'studie', kolekce 'iwrs').
|
|
|
|
Idempotentní: upsert podle pk (unikátní identifikátor notifikace v IWRS).
|
|
"""
|
|
|
|
import re
|
|
import datetime
|
|
import sys
|
|
|
|
import mysql.connector
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
import db_config
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "studie"
|
|
MONGO_COLL = "iwrs_notifications"
|
|
|
|
|
|
# ── parsery ──────────────────────────────────────────────────────────────────
|
|
|
|
def parse_kv_lines(text):
|
|
"""Vytáhne všechny řádky typu 'Klíč: Hodnota' do dictu.
|
|
Když je hodnota za dvojtečkou prázdná, vezme se první neprázdný následující řádek."""
|
|
out = {}
|
|
lines = [l.strip() for l in text.splitlines()]
|
|
pending_key = None
|
|
for line in lines:
|
|
# čekáme na hodnotu pro klíč z předchozího řádku
|
|
if pending_key is not None:
|
|
if not line:
|
|
continue
|
|
if ":" not in line:
|
|
out.setdefault(pending_key, line)
|
|
pending_key = None
|
|
continue
|
|
# další řádek je sám "Klíč: Hodnota" → zahodíme pending a zpracujeme normálně
|
|
pending_key = None
|
|
|
|
if not line or ":" not in line:
|
|
continue
|
|
if line.lower().startswith("http"):
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
if not key or (" " in key and len(key.split()) > 8):
|
|
continue
|
|
if not val:
|
|
pending_key = key
|
|
continue
|
|
out.setdefault(key, val)
|
|
return out
|
|
|
|
|
|
DATE_RE = re.compile(r"^\d{2}-[A-Z][a-z]{2}-\d{4}$")
|
|
DATETIME_RE = re.compile(r"^(\d{2}-[A-Z][a-z]{2}-\d{4})\s+(\d{2}:\d{2}:\d{2})$")
|
|
|
|
|
|
def to_date(s):
|
|
if not s:
|
|
return None
|
|
s = s.strip()
|
|
if DATE_RE.match(s):
|
|
try:
|
|
return datetime.datetime.strptime(s, "%d-%b-%Y")
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def to_datetime(s):
|
|
if not s:
|
|
return None
|
|
s = re.sub(r"\s+", " ", s.strip())
|
|
m = DATETIME_RE.match(s)
|
|
if m:
|
|
try:
|
|
return datetime.datetime.strptime(f"{m.group(1)} {m.group(2)}", "%d-%b-%Y %H:%M:%S")
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
MED_ROW_RE = re.compile(
|
|
r"(?P<no>\d{7})\s*[\s\n]*"
|
|
r"(?P<type>[A-Za-z][A-Za-z0-9 /+\-]+?)\s*[\s\n]*"
|
|
r"(?P<lot>[A-Z0-9]{5,10})\s*[\s\n]*"
|
|
r"(?P<exp>\d{2}-[A-Z][a-z]{2}-\d{4})"
|
|
)
|
|
|
|
|
|
def parse_medication_table(text):
|
|
"""Najde záznamy medikace (med_no, med_type, lot, expirace) v textu.
|
|
Pracuje s oběma formáty (UCO3001 multiline i MDD3003 concatenated)."""
|
|
rows = []
|
|
# zkomprimuj whitespace pro snadnější regex
|
|
compact = re.sub(r"\s+", " ", text)
|
|
for m in MED_ROW_RE.finditer(compact):
|
|
med_type = m.group("type").strip()
|
|
# uřízni nadbytečné koncové fragmenty
|
|
med_type = re.sub(r"\s+(Packaged|Lot|Expiration|No|Date|Medication).*$", "", med_type).strip()
|
|
rows.append({
|
|
"medication_no": m.group("no"),
|
|
"medication_type": med_type,
|
|
"lot_no": m.group("lot"),
|
|
"expiration_date": to_date(m.group("exp")),
|
|
})
|
|
# dedupe
|
|
seen = set()
|
|
unique = []
|
|
for r in rows:
|
|
key = (r["medication_no"], r["lot_no"])
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
unique.append(r)
|
|
return unique
|
|
|
|
|
|
# fields, které chceme v dokumentu vyloučit z kv (ošklivé / nepotřebné)
|
|
KV_BLACKLIST = {
|
|
"If you have questions about this notification, please contact 4G Clinical Support at",
|
|
}
|
|
|
|
|
|
def build_document(row):
|
|
pk, study, subject, title, label, event, actual_date, text = row
|
|
|
|
kv = parse_kv_lines(text)
|
|
meds = parse_medication_table(text)
|
|
|
|
# převod známých datumových/datetime polí
|
|
dt_site = to_datetime(kv.get("Transaction Date/Time (site local)"))
|
|
dt_sys = to_datetime(kv.get("Transaction Date/Time (system local)"))
|
|
|
|
date_fields = [
|
|
"Informed Consent Date",
|
|
"Informed Consent Date at Screening",
|
|
"Informed Consent Date at Subject Creation",
|
|
"Date of Subject Creation in IRT",
|
|
"Date of Screening in IRT",
|
|
"Screenfail Date",
|
|
"Discontinuation date",
|
|
"Dispensation date",
|
|
"Returned Date",
|
|
]
|
|
parsed_dates = {}
|
|
for f in date_fields:
|
|
if f in kv:
|
|
d = to_date(kv[f])
|
|
if d:
|
|
parsed_dates[f] = d
|
|
|
|
doc = {
|
|
"_id": pk, # použij IWRS pk jako _id (idempotence)
|
|
"pk": pk,
|
|
"study": study,
|
|
"subject": subject,
|
|
"title": title,
|
|
"label": label,
|
|
"event": event,
|
|
"actual_date": (
|
|
datetime.datetime.combine(actual_date, datetime.time())
|
|
if isinstance(actual_date, datetime.date) and not isinstance(actual_date, datetime.datetime)
|
|
else actual_date
|
|
),
|
|
"site": kv.get("Site"),
|
|
"investigator": kv.get("Investigator"),
|
|
"location": kv.get("Location"),
|
|
"cohort": kv.get("Cohort"),
|
|
"irt_subject_status": kv.get("IRT Subject Status"),
|
|
"transaction_site_local": dt_site,
|
|
"transaction_system_local": dt_sys,
|
|
"transaction_by": kv.get("Transaction performed by"),
|
|
"medications": meds,
|
|
"fields": {k: v for k, v in kv.items() if k not in {
|
|
"Site", "Investigator", "Location", "Cohort", "IRT Subject Status",
|
|
"Subject",
|
|
"Transaction Date/Time (site local)",
|
|
"Transaction Date/Time (system local)",
|
|
"Transaction performed by",
|
|
}},
|
|
"parsed_dates": parsed_dates,
|
|
"raw_text": text,
|
|
}
|
|
return doc
|
|
|
|
|
|
# ── main ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main(studies=None):
|
|
conn = mysql.connector.connect(
|
|
host=db_config.DB_HOST, port=db_config.DB_PORT,
|
|
user=db_config.DB_USER, password=db_config.DB_PASSWORD,
|
|
database=db_config.DB_NAME,
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
if studies:
|
|
placeholders = ",".join(["%s"] * len(studies))
|
|
cur.execute(
|
|
f"SELECT pk, study, subject, title, label, event, actual_date, text "
|
|
f"FROM iwrs_notifications WHERE study IN ({placeholders})",
|
|
studies,
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT pk, study, subject, title, label, event, actual_date, text "
|
|
"FROM iwrs_notifications"
|
|
)
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
print(f" Nacteno {len(rows)} notifikaci z MySQL")
|
|
|
|
mc = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
coll = mc[MONGO_DB][MONGO_COLL]
|
|
|
|
# indexy
|
|
coll.create_index([("study", ASCENDING), ("subject", ASCENDING)])
|
|
coll.create_index([("study", ASCENDING), ("title", ASCENDING)])
|
|
coll.create_index([("actual_date", ASCENDING)])
|
|
|
|
upserts = 0
|
|
for row in rows:
|
|
doc = build_document(row)
|
|
coll.replace_one({"_id": doc["_id"]}, doc, upsert=True)
|
|
upserts += 1
|
|
|
|
print(f" Upsert {upserts} dokumentu do {MONGO_DB}.{MONGO_COLL}")
|
|
|
|
# stats
|
|
print("\n Statistika v Mongo:")
|
|
for r in coll.aggregate([
|
|
{"$group": {"_id": {"study": "$study", "title": "$title"}, "count": {"$sum": 1}}},
|
|
{"$sort": {"_id.study": 1, "_id.title": 1}},
|
|
]):
|
|
print(f" {r['_id']['study']} | {r['_id']['title']:30s} | {r['count']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
studies = sys.argv[1:] if len(sys.argv) > 1 else None
|
|
main(studies)
|