111 lines
3.3 KiB
Python
111 lines
3.3 KiB
Python
"""
|
|
notification_parsers.py — parsery textů IWRS notifikací.
|
|
Verze: 1.0 | Datum: 2026-06-10
|
|
|
|
Vyčleněno z bývalého Patients/parse_notifications_to_mongo.py (MySQL→Mongo
|
|
legacy skript) — zůstaly jen čisté parsovací funkce, které používá
|
|
import_notifications_to_mongo.py. Žádná závislost na MySQL.
|
|
"""
|
|
|
|
import re
|
|
import datetime
|
|
|
|
|
|
def parse_kv_lines(text):
|
|
"""Vytáhne všechny řádky typu 'Klíč: Hodnota' do dictu.
|
|
Když je hodnota za dvojtečkou prázdná, vezme se první neprázdný následující řádek."""
|
|
out = {}
|
|
lines = [l.strip() for l in text.splitlines()]
|
|
pending_key = None
|
|
for line in lines:
|
|
# čekáme na hodnotu pro klíč z předchozího řádku
|
|
if pending_key is not None:
|
|
if not line:
|
|
continue
|
|
if ":" not in line:
|
|
out.setdefault(pending_key, line)
|
|
pending_key = None
|
|
continue
|
|
# další řádek je sám "Klíč: Hodnota" → zahodíme pending a zpracujeme normálně
|
|
pending_key = None
|
|
|
|
if not line or ":" not in line:
|
|
continue
|
|
if line.lower().startswith("http"):
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
if not key or (" " in key and len(key.split()) > 8):
|
|
continue
|
|
if not val:
|
|
pending_key = key
|
|
continue
|
|
out.setdefault(key, val)
|
|
return out
|
|
|
|
|
|
DATE_RE = re.compile(r"^\d{2}-[A-Z][a-z]{2}-\d{4}$")
|
|
DATETIME_RE = re.compile(r"^(\d{2}-[A-Z][a-z]{2}-\d{4})\s+(\d{2}:\d{2}:\d{2})$")
|
|
|
|
|
|
def to_date(s):
|
|
if not s:
|
|
return None
|
|
s = s.strip()
|
|
if DATE_RE.match(s):
|
|
try:
|
|
return datetime.datetime.strptime(s, "%d-%b-%Y")
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def to_datetime(s):
|
|
if not s:
|
|
return None
|
|
s = re.sub(r"\s+", " ", s.strip())
|
|
m = DATETIME_RE.match(s)
|
|
if m:
|
|
try:
|
|
return datetime.datetime.strptime(f"{m.group(1)} {m.group(2)}", "%d-%b-%Y %H:%M:%S")
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
MED_ROW_RE = re.compile(
|
|
r"(?P<no>\d{7})\s*[\s\n]*"
|
|
r"(?P<type>[A-Za-z][A-Za-z0-9 /+\-]+?)\s*[\s\n]*"
|
|
r"(?P<lot>[A-Z0-9]{5,10})\s*[\s\n]*"
|
|
r"(?P<exp>\d{2}-[A-Z][a-z]{2}-\d{4})"
|
|
)
|
|
|
|
|
|
def parse_medication_table(text):
|
|
"""Najde záznamy medikace (med_no, med_type, lot, expirace) v textu.
|
|
Pracuje s oběma formáty (UCO3001 multiline i MDD3003 concatenated)."""
|
|
rows = []
|
|
# zkomprimuj whitespace pro snadnější regex
|
|
compact = re.sub(r"\s+", " ", text)
|
|
for m in MED_ROW_RE.finditer(compact):
|
|
med_type = m.group("type").strip()
|
|
# uřízni nadbytečné koncové fragmenty
|
|
med_type = re.sub(r"\s+(Packaged|Lot|Expiration|No|Date|Medication).*$", "", med_type).strip()
|
|
rows.append({
|
|
"medication_no": m.group("no"),
|
|
"medication_type": med_type,
|
|
"lot_no": m.group("lot"),
|
|
"expiration_date": to_date(m.group("exp")),
|
|
})
|
|
# dedupe
|
|
seen = set()
|
|
unique = []
|
|
for r in rows:
|
|
key = (r["medication_no"], r["lot_no"])
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
unique.append(r)
|
|
return unique
|