""" Parsuje texty IWRS notifikací z MySQL (iwrs_notifications) a ukládá strukturovaná data do MongoDB (databáze 'studie', kolekce 'iwrs'). Idempotentní: upsert podle pk (unikátní identifikátor notifikace v IWRS). """ import re import datetime import sys import mysql.connector from pymongo import MongoClient, ASCENDING import db_config MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "studie" MONGO_COLL = "iwrs_notifications" # ── parsery ────────────────────────────────────────────────────────────────── def parse_kv_lines(text): """Vytáhne všechny řádky typu 'Klíč: Hodnota' do dictu. Když je hodnota za dvojtečkou prázdná, vezme se první neprázdný následující řádek.""" out = {} lines = [l.strip() for l in text.splitlines()] pending_key = None for line in lines: # čekáme na hodnotu pro klíč z předchozího řádku if pending_key is not None: if not line: continue if ":" not in line: out.setdefault(pending_key, line) pending_key = None continue # další řádek je sám "Klíč: Hodnota" → zahodíme pending a zpracujeme normálně pending_key = None if not line or ":" not in line: continue if line.lower().startswith("http"): continue key, _, val = line.partition(":") key = key.strip() val = val.strip() if not key or (" " in key and len(key.split()) > 8): continue if not val: pending_key = key continue out.setdefault(key, val) return out DATE_RE = re.compile(r"^\d{2}-[A-Z][a-z]{2}-\d{4}$") DATETIME_RE = re.compile(r"^(\d{2}-[A-Z][a-z]{2}-\d{4})\s+(\d{2}:\d{2}:\d{2})$") def to_date(s): if not s: return None s = s.strip() if DATE_RE.match(s): try: return datetime.datetime.strptime(s, "%d-%b-%Y") except ValueError: return None return None def to_datetime(s): if not s: return None s = re.sub(r"\s+", " ", s.strip()) m = DATETIME_RE.match(s) if m: try: return datetime.datetime.strptime(f"{m.group(1)} {m.group(2)}", "%d-%b-%Y %H:%M:%S") except ValueError: return None return None MED_ROW_RE = re.compile( r"(?P\d{7})\s*[\s\n]*" r"(?P[A-Za-z][A-Za-z0-9 /+\-]+?)\s*[\s\n]*" r"(?P[A-Z0-9]{5,10})\s*[\s\n]*" r"(?P\d{2}-[A-Z][a-z]{2}-\d{4})" ) def parse_medication_table(text): """Najde záznamy medikace (med_no, med_type, lot, expirace) v textu. Pracuje s oběma formáty (UCO3001 multiline i MDD3003 concatenated).""" rows = [] # zkomprimuj whitespace pro snadnější regex compact = re.sub(r"\s+", " ", text) for m in MED_ROW_RE.finditer(compact): med_type = m.group("type").strip() # uřízni nadbytečné koncové fragmenty med_type = re.sub(r"\s+(Packaged|Lot|Expiration|No|Date|Medication).*$", "", med_type).strip() rows.append({ "medication_no": m.group("no"), "medication_type": med_type, "lot_no": m.group("lot"), "expiration_date": to_date(m.group("exp")), }) # dedupe seen = set() unique = [] for r in rows: key = (r["medication_no"], r["lot_no"]) if key in seen: continue seen.add(key) unique.append(r) return unique # fields, které chceme v dokumentu vyloučit z kv (ošklivé / nepotřebné) KV_BLACKLIST = { "If you have questions about this notification, please contact 4G Clinical Support at", } def build_document(row): pk, study, subject, title, label, event, actual_date, text = row kv = parse_kv_lines(text) meds = parse_medication_table(text) # převod známých datumových/datetime polí dt_site = to_datetime(kv.get("Transaction Date/Time (site local)")) dt_sys = to_datetime(kv.get("Transaction Date/Time (system local)")) date_fields = [ "Informed Consent Date", "Informed Consent Date at Screening", "Informed Consent Date at Subject Creation", "Date of Subject Creation in IRT", "Date of Screening in IRT", "Screenfail Date", "Discontinuation date", "Dispensation date", "Returned Date", ] parsed_dates = {} for f in date_fields: if f in kv: d = to_date(kv[f]) if d: parsed_dates[f] = d doc = { "_id": pk, # použij IWRS pk jako _id (idempotence) "pk": pk, "study": study, "subject": subject, "title": title, "label": label, "event": event, "actual_date": ( datetime.datetime.combine(actual_date, datetime.time()) if isinstance(actual_date, datetime.date) and not isinstance(actual_date, datetime.datetime) else actual_date ), "site": kv.get("Site"), "investigator": kv.get("Investigator"), "location": kv.get("Location"), "cohort": kv.get("Cohort"), "irt_subject_status": kv.get("IRT Subject Status"), "transaction_site_local": dt_site, "transaction_system_local": dt_sys, "transaction_by": kv.get("Transaction performed by"), "medications": meds, "fields": {k: v for k, v in kv.items() if k not in { "Site", "Investigator", "Location", "Cohort", "IRT Subject Status", "Subject", "Transaction Date/Time (site local)", "Transaction Date/Time (system local)", "Transaction performed by", }}, "parsed_dates": parsed_dates, "raw_text": text, } return doc # ── main ───────────────────────────────────────────────────────────────────── def main(studies=None): conn = mysql.connector.connect( host=db_config.DB_HOST, port=db_config.DB_PORT, user=db_config.DB_USER, password=db_config.DB_PASSWORD, database=db_config.DB_NAME, ) cur = conn.cursor() if studies: placeholders = ",".join(["%s"] * len(studies)) cur.execute( f"SELECT pk, study, subject, title, label, event, actual_date, text " f"FROM iwrs_notifications WHERE study IN ({placeholders})", studies, ) else: cur.execute( "SELECT pk, study, subject, title, label, event, actual_date, text " "FROM iwrs_notifications" ) rows = cur.fetchall() cur.close() conn.close() print(f" Nacteno {len(rows)} notifikaci z MySQL") mc = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) coll = mc[MONGO_DB][MONGO_COLL] # indexy coll.create_index([("study", ASCENDING), ("subject", ASCENDING)]) coll.create_index([("study", ASCENDING), ("title", ASCENDING)]) coll.create_index([("actual_date", ASCENDING)]) upserts = 0 for row in rows: doc = build_document(row) coll.replace_one({"_id": doc["_id"]}, doc, upsert=True) upserts += 1 print(f" Upsert {upserts} dokumentu do {MONGO_DB}.{MONGO_COLL}") # stats print("\n Statistika v Mongo:") for r in coll.aggregate([ {"$group": {"_id": {"study": "$study", "title": "$title"}, "count": {"$sum": 1}}}, {"$sort": {"_id.study": 1, "_id.title": 1}}, ]): print(f" {r['_id']['study']} | {r['_id']['title']:30s} | {r['count']}") if __name__ == "__main__": studies = sys.argv[1:] if len(sys.argv) > 1 else None main(studies)