diff --git a/DASTA/nahraj_do_postgres.py b/DASTA/nahraj_do_postgres.py new file mode 100644 index 0000000..d3b812c --- /dev/null +++ b/DASTA/nahraj_do_postgres.py @@ -0,0 +1,408 @@ +# -*- coding: utf-8 -*- +""" +Načte DASTA XML soubory a uloží je do PostgreSQL databáze `ordinace` +do tabulek s prefixem `dasta_`. + +Vše čistě v Pythonu přes psycopg (v3). Skript je IDEMPOTENTNÍ: + - databázi `ordinace` založí, jen pokud neexistuje + - tabulky vytvoří přes CREATE TABLE IF NOT EXISTS + - každý soubor se nahrává podle klíče = název souboru (bez přípony); + při opakovaném běhu se zpráva UPSERTne a její výsledky/diagnózy + se smažou a vloží znovu → výsledek je vždy stejný, žádné duplicity. + +Připojení se bere z Medevio/.env (PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB). + +Použití: + python nahraj_do_postgres.py # zdroj = U:\\DASTA (výchozí) + python nahraj_do_postgres.py D:\\jine\\dasta # jiný zdrojový adresář + python nahraj_do_postgres.py U:\\DASTA --limit 50 # jen prvních 50 (test) + python nahraj_do_postgres.py --recreate # zahodí dasta_ tabulky a založí znovu + +Tabulky: + dasta_pacient (rodne_cislo PK) + dasta_zprava (soubor PK) → pacient + dasta_vysledek (id PK) → zprava [jednotlivé analyty] + dasta_diagnoza (id PK) → zprava +""" +from __future__ import annotations + +import os +import re +import sys +from datetime import date, datetime +from pathlib import Path +from xml.etree import ElementTree as ET + +import psycopg + +ZDROJ_VYCHOZI = Path(r"U:\DASTA") + + +# --------------------------------------------------------------------------- +# .env +# --------------------------------------------------------------------------- +def _load_env() -> None: + env_path = Path(__file__).resolve().parent.parent / "Medevio" / ".env" + if env_path.exists(): + for line in env_path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + os.environ[k.strip()] = v.strip() + + +_load_env() + +PG = dict( + host=os.environ.get("PG_HOST", "localhost"), + port=os.environ.get("PG_PORT", "5432"), + user=os.environ.get("PG_USER"), + password=os.environ.get("PG_PASSWORD"), +) +PG_DB = os.environ.get("PG_DB", "ordinace") + + +# --------------------------------------------------------------------------- +# Konverze hodnot +# --------------------------------------------------------------------------- +def num(s: str | None) -> float | None: + """Český zápis čísla ('7,5') → float. Nečíselné vrací None.""" + if s is None: + return None + t = s.strip().replace("\xa0", "").replace(" ", "").replace(",", ".") + try: + return float(t) + except ValueError: + return None + + +def ts(s: str | None) -> datetime | None: + """'2016-06-20T11:15:18' nebo '2017-05-18T07:30' → datetime.""" + if not s: + return None + try: + return datetime.fromisoformat(s.strip()) + except ValueError: + return None + + +def dat(s: str | None) -> date | None: + if not s: + return None + try: + return date.fromisoformat(s.strip()[:10]) + except ValueError: + return None + + +def _text(el, tag): + if el is None: + return None + c = el.find(tag) + return c.text.strip() if (c is not None and c.text) else None + + +# --------------------------------------------------------------------------- +# Schéma +# --------------------------------------------------------------------------- +DDL = """ +CREATE TABLE IF NOT EXISTS dasta_pacient ( + rodne_cislo text PRIMARY KEY, + jmeno text, + prijmeni text, + dat_narozeni date, + sex text +); + +CREATE TABLE IF NOT EXISTS dasta_zprava ( + soubor text PRIMARY KEY, + id_soubor text, + ozn_soub text, + dat_vytvoreni timestamp, + verze_ds text, + typ_odesm text, + zdroj_prog text, + zdroj_verze text, + odesilatel_icp text, + odesilatel_ico text, + odesilatel_nazev text, + prijemce_icp text, + prijemce_nazev text, + rodne_cislo text REFERENCES dasta_pacient(rodne_cislo) +); + +CREATE TABLE IF NOT EXISTS dasta_vysledek ( + id bigserial PRIMARY KEY, + soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE, + klic_nclp text, + nazev text, + jednotka text, + hodnota_raw text, + hodnota_num double precision, + dat_odber timestamp, + dat_odber_typ text, + dat_vydani timestamp, + autor text, + stav text, + typ_kvant text, + ref_low double precision, + ref_high double precision, + mimo_normu smallint +); + +CREATE TABLE IF NOT EXISTS dasta_diagnoza ( + id bigserial PRIMARY KEY, + soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE, + poradi int, + kod text +); + +CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_soubor ON dasta_vysledek(soubor); +CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_nclp ON dasta_vysledek(klic_nclp); +CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_odber ON dasta_vysledek(dat_odber); +CREATE INDEX IF NOT EXISTS ix_dasta_zprava_rc ON dasta_zprava(rodne_cislo); +""" + +DROP = """ +DROP TABLE IF EXISTS dasta_vysledek CASCADE; +DROP TABLE IF EXISTS dasta_diagnoza CASCADE; +DROP TABLE IF EXISTS dasta_zprava CASCADE; +DROP TABLE IF EXISTS dasta_pacient CASCADE; +""" + + +def ensure_database() -> None: + """Založí DB `ordinace`, pokud neexistuje (mimo transakci).""" + with psycopg.connect(dbname="postgres", autocommit=True, connect_timeout=10, **PG) as c: + exists = c.execute( + "SELECT 1 FROM pg_database WHERE datname = %s", (PG_DB,) + ).fetchone() + if not exists: + # TEMPLATE template0 obchází collation version mismatch u template1 + c.execute(f'CREATE DATABASE "{PG_DB}" TEMPLATE template0') + print(f"Databáze {PG_DB} vytvořena.") + else: + print(f"Databáze {PG_DB} už existuje.") + + +# --------------------------------------------------------------------------- +# Parsování jednoho souboru → (pacient, zprava, vysledky, diagnozy) +# --------------------------------------------------------------------------- +_RE_ENC = re.compile(r"encoding=['\"][^'\"]+['\"]", re.I) + + +def _nacti_root(raw: bytes): + """Naparsuje XML; když selže (špatně deklarované kódování), zkusí UTF-8.""" + try: + return ET.fromstring(raw) + except ET.ParseError: + # Některé soubory deklarují Windows-1250, ale jsou v UTF-8. + text = raw.decode("utf-8", errors="replace") + text = _RE_ENC.sub("", text, count=1) # odstraň chybnou deklaraci + return ET.fromstring(text) + + +def parse_file(cesta: Path): + root = _nacti_root(cesta.read_bytes()) + soubor = cesta.stem + + zdroj = root.find("zdroj_is") + pm = root.find("pm") + is_el = root.find("is") + pm_a = pm.find("a") if pm is not None else None + is_a = is_el.find("a") if is_el is not None else None + ip = is_el.find("ip") if is_el is not None else None + + # pacient + rodne_cislo = _text(ip, "rodcis") if ip is not None else None + pacient = None + if rodne_cislo: + pacient = ( + rodne_cislo, + _text(ip, "jmeno"), + _text(ip, "prijmeni"), + dat(_text(ip, "dat_dn")), + _text(ip, "sex"), + ) + + # pojišťovna se sem nedává (lze doplnit), držíme se zadaného rozsahu + + zprava = ( + soubor, + root.get("id_soubor"), + root.get("ozn_soub"), + ts(root.get("dat_vb")), + root.get("verze_ds"), + root.get("typ_odesm"), + zdroj.get("kod_prog") if zdroj is not None else None, + zdroj.get("verze_prog") if zdroj is not None else None, + is_el.get("icp") if is_el is not None else None, + is_el.get("ico") if is_el is not None else None, + _text(is_a, "jmeno") if is_a is not None else None, + pm.get("icp") if pm is not None else None, + _text(pm_a, "jmeno") if pm_a is not None else None, + rodne_cislo, + ) + + vysledky = [] + diagnozy = [] + if ip is not None: + dg = ip.find("dg") + if dg is not None: + for i, diag in enumerate(dg.iter("diag"), 1): + if diag.text: + diagnozy.append((soubor, i, diag.text.strip())) + + v = ip.find("v") + if v is not None: + for vr in v.findall("vr"): + vrn = vr.find("vrn") + nazvy = vrn.find("nazvy") if vrn is not None else None + skala = vrn.find("skala") if vrn is not None else None + dat_du = vr.find("dat_du") + + ref_low = ref_high = None + if skala is not None: + ref_low = num(_text(skala, "s4")) + ref_high = num(_text(skala, "s5")) + + hodnota_raw = _text(vrn, "hodnota") if vrn is not None else None + hodnota_num = num(hodnota_raw) + + mimo = None + if hodnota_num is not None and (ref_low is not None or ref_high is not None): + mimo = 0 + if ref_low is not None and hodnota_num < ref_low: + mimo = 1 + if ref_high is not None and hodnota_num > ref_high: + mimo = 1 + + vysledky.append(( + soubor, + vr.get("klic_nclp"), + _text(vr, "nazev_lclp"), + nazvy.get("jednotka") if nazvy is not None else None, + hodnota_raw, + hodnota_num, + ts(dat_du.text if dat_du is not None else None), + dat_du.get("typ") if dat_du is not None else None, + ts(_text(vr, "dat_vv")), + _text(vr, "autor"), + vr.get("stav_vys"), + vrn.get("priznak_kvant") if vrn is not None else None, + ref_low, + ref_high, + mimo, + )) + + return pacient, zprava, vysledky, diagnozy + + +# --------------------------------------------------------------------------- +# Zápis (idempotentní) +# --------------------------------------------------------------------------- +UPSERT_PACIENT = """ +INSERT INTO dasta_pacient (rodne_cislo, jmeno, prijmeni, dat_narozeni, sex) +VALUES (%s,%s,%s,%s,%s) +ON CONFLICT (rodne_cislo) DO UPDATE SET + jmeno=EXCLUDED.jmeno, prijmeni=EXCLUDED.prijmeni, + dat_narozeni=EXCLUDED.dat_narozeni, sex=EXCLUDED.sex; +""" + +UPSERT_ZPRAVA = """ +INSERT INTO dasta_zprava (soubor,id_soubor,ozn_soub,dat_vytvoreni,verze_ds,typ_odesm, + zdroj_prog,zdroj_verze,odesilatel_icp,odesilatel_ico,odesilatel_nazev, + prijemce_icp,prijemce_nazev,rodne_cislo) +VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) +ON CONFLICT (soubor) DO UPDATE SET + id_soubor=EXCLUDED.id_soubor, ozn_soub=EXCLUDED.ozn_soub, + dat_vytvoreni=EXCLUDED.dat_vytvoreni, verze_ds=EXCLUDED.verze_ds, + typ_odesm=EXCLUDED.typ_odesm, zdroj_prog=EXCLUDED.zdroj_prog, + zdroj_verze=EXCLUDED.zdroj_verze, odesilatel_icp=EXCLUDED.odesilatel_icp, + odesilatel_ico=EXCLUDED.odesilatel_ico, odesilatel_nazev=EXCLUDED.odesilatel_nazev, + prijemce_icp=EXCLUDED.prijemce_icp, prijemce_nazev=EXCLUDED.prijemce_nazev, + rodne_cislo=EXCLUDED.rodne_cislo; +""" + +INS_VYSLEDEK = """ +INSERT INTO dasta_vysledek (soubor,klic_nclp,nazev,jednotka,hodnota_raw,hodnota_num, + dat_odber,dat_odber_typ,dat_vydani,autor,stav,typ_kvant,ref_low,ref_high,mimo_normu) +VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s); +""" + +INS_DIAGNOZA = "INSERT INTO dasta_diagnoza (soubor,poradi,kod) VALUES (%s,%s,%s);" + + +def main() -> None: + args = sys.argv[1:] + recreate = "--recreate" in args + limit = None + if "--limit" in args: + limit = int(args[args.index("--limit") + 1]) + pozicni = [a for a in args if not a.startswith("--")] + # odfiltruj hodnotu za --limit + if limit is not None and pozicni and pozicni[0] == str(limit): + pozicni = pozicni[1:] + zdroj = Path(pozicni[0]) if pozicni else ZDROJ_VYCHOZI + + print(f"Zdroj: {zdroj}") + print(f"Cíl: postgresql://{PG['host']}:{PG['port']}/{PG_DB} (tabulky dasta_*)") + + ensure_database() + + soubory = sorted(zdroj.glob("*.xml")) + if limit: + soubory = soubory[:limit] + print(f"Souborů ke zpracování: {len(soubory)}") + print("-" * 60) + + ok = chyb = 0 + chyby = [] + + # autocommit=True → každý soubor je samostatná transakce (conn.transaction), + # takže chyba u jednoho souboru nikdy neovlivní ostatní. + with psycopg.connect(dbname=PG_DB, autocommit=True, connect_timeout=10, **PG) as conn: + with conn.cursor() as cur: + if recreate: + cur.execute(DROP) + print("Tabulky dasta_* zahozeny.") + cur.execute(DDL) + + cur = conn.cursor() + for i, src in enumerate(soubory, 1): + try: + pacient, zprava, vysledky, diagnozy = parse_file(src) + with conn.transaction(): # savepoint pro tento soubor + if pacient: + cur.execute(UPSERT_PACIENT, pacient) + cur.execute(UPSERT_ZPRAVA, zprava) + cur.execute("DELETE FROM dasta_vysledek WHERE soubor=%s", (src.stem,)) + cur.execute("DELETE FROM dasta_diagnoza WHERE soubor=%s", (src.stem,)) + if vysledky: + cur.executemany(INS_VYSLEDEK, vysledky) + if diagnozy: + cur.executemany(INS_DIAGNOZA, diagnozy) + ok += 1 + except Exception as e: + chyb += 1 + chyby.append(f"{src.name}: {type(e).__name__} {e}") + continue + if i % 500 == 0: + print(f" ... {i}/{len(soubory)}") + + print("-" * 60) + print(f"Hotovo. Zpráv OK: {ok} Chyb: {chyb}") + if chyby: + print("Chyby:") + for c in chyby[:20]: + print(" " + c) + + # Souhrn + with psycopg.connect(dbname=PG_DB, **PG) as conn: + for t in ("dasta_pacient", "dasta_zprava", "dasta_vysledek", "dasta_diagnoza"): + n = conn.execute(f"SELECT count(*) FROM {t}").fetchone()[0] + print(f" {t:18}: {n}") + + +if __name__ == "__main__": + main() diff --git a/DASTA/parse_dasta.py b/DASTA/parse_dasta.py new file mode 100644 index 0000000..418eac6 --- /dev/null +++ b/DASTA/parse_dasta.py @@ -0,0 +1,260 @@ +# -*- coding: utf-8 -*- +""" +Parser DASTA XML (Datový standard MZ ČR, verze DS 03.01.01). + +Rozebírá laboratorní zprávy (typ odesílatele LB) do strukturovaných dat. +Soubory jsou v kódování windows-1250 (uvedeno v XML deklaraci) a obsahují +DOCTYPE odkaz na lokální DTD (ds030101.dtd), který při parsování ignorujeme. + +Struktura DASTA dávky (zjednodušeně): + dasta kořen, hlavička dávky (datum, verze, odesílatel) + zdroj_is informační systém, který dávku vytvořil + pm (icp) příjemce zprávy (ordinace) + adresa (a typ="P") + is (icp, ico) odesílatel = laboratoř + adresa (a typ="O") + ip (id_pac) pacient: rodné číslo, jméno, dat. narození, sex + pv / p pojišťovna (kodpoj, typpoj) + dg / dgz / diag diagnózy + v blok výsledků + vr (klic_nclp...) jeden laboratorní výsledek (analyt) + nazev_lclp název položky (WBC, RBC, ...) + dat_du/dat_pl/dat_vv odběr / příjem / vydání výsledku + autor validující lékař + vrn číselný výsledek + hodnota naměřená hodnota + nazvy@jednotka měrná jednotka + skala s1..s8 referenční pásma (meze) + interpret_g_z grafická interpretace | | * | | +""" +from __future__ import annotations + +import sys +from dataclasses import dataclass, field, asdict +from pathlib import Path +from xml.etree import ElementTree as ET + + +# --------------------------------------------------------------------------- +# Datové třídy +# --------------------------------------------------------------------------- +@dataclass +class Vysledek: + klic_nclp: str # kód NČLP (Národní číselník laboratorních položek) + nazev: str # lokální název položky (WBC, RBC, ...) + hodnota: str | None + jednotka: str | None + dat_odber: str | None # dat_du – datum a čas odběru + dat_vydani: str | None # dat_vv – datum a čas vydání výsledku + autor: str | None + stav: str | None # stav_vys (A = definitivní) + typ_kvant: str | None # priznak_kvant (R = reálné číslo) + interpret: str | None # grafická interpretace mezí + meze: list[str] = field(default_factory=list) # s1..s8 + + @property + def referencni_mez(self) -> str | None: + """Klinicky relevantní referenční rozmezí = s4 (dolní) až s5 (horní).""" + if len(self.meze) >= 5: + return f"{self.meze[3]} – {self.meze[4]}" + return None + + +@dataclass +class Pacient: + id_pac: str | None + rodne_cislo: str | None + jmeno: str | None + prijmeni: str | None + datum_narozeni: str | None + sex: str | None + pojistovna: str | None + typ_pojisteni: str | None + diagnozy: list[str] = field(default_factory=list) + vysledky: list[Vysledek] = field(default_factory=list) + + +@dataclass +class Adresa: + jmeno: str | None = None + radky: list[str] = field(default_factory=list) + psc: str | None = None + mesto: str | None = None + + +@dataclass +class DastaZprava: + ozn_soub: str | None + id_soubor: str | None + datum_vytvoreni: str | None + verze_ds: str | None + typ_odesilatele: str | None + zdroj_program: str | None + zdroj_verze: str | None + prijemce_icp: str | None + prijemce: Adresa | None + odesilatel_icp: str | None + odesilatel_ico: str | None + odesilatel: Adresa | None + pacienti: list[Pacient] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Pomocné funkce +# --------------------------------------------------------------------------- +def _text(el, tag: str) -> str | None: + """Vrátí text potomka `tag` nebo None.""" + if el is None: + return None + child = el.find(tag) + return child.text.strip() if (child is not None and child.text) else None + + +def _parse_adresa(el) -> Adresa | None: + if el is None: + return None + a = el.find("a") + if a is None: + return None + radky = [v for v in (_text(a, "adr"), _text(a, "dop1"), _text(a, "dop2")) if v] + return Adresa( + jmeno=_text(a, "jmeno"), + radky=radky, + psc=_text(a, "psc"), + mesto=_text(a, "mesto"), + ) + + +# --------------------------------------------------------------------------- +# Hlavní parser +# --------------------------------------------------------------------------- +def parse_dasta(cesta: str | Path) -> DastaZprava: + cesta = Path(cesta) + # XML obsahuje DOCTYPE s odkazem na DTD; vypneme načítání externích entit + # tím, že parsujeme bez resolveru. ElementTree DTD ignoruje automaticky. + raw = cesta.read_bytes() + # ElementTree si kódování přečte z XML deklarace (). + root = ET.fromstring(raw) + + zdroj = root.find("zdroj_is") + pm = root.find("pm") + is_el = root.find("is") + + zprava = DastaZprava( + ozn_soub=root.get("ozn_soub"), + id_soubor=root.get("id_soubor"), + datum_vytvoreni=root.get("dat_vb"), + verze_ds=root.get("verze_ds"), + typ_odesilatele=root.get("typ_odesm"), + zdroj_program=zdroj.get("kod_prog") if zdroj is not None else None, + zdroj_verze=zdroj.get("verze_prog") if zdroj is not None else None, + prijemce_icp=pm.get("icp") if pm is not None else None, + prijemce=_parse_adresa(pm), + odesilatel_icp=is_el.get("icp") if is_el is not None else None, + odesilatel_ico=is_el.get("ico") if is_el is not None else None, + odesilatel=_parse_adresa(is_el), + ) + + if is_el is None: + return zprava + + for ip in is_el.findall("ip"): + # pojišťovna – bere se z elementu
(přímo nebo uvnitř