# -*- coding: utf-8 -*- """ Načte DASTA XML soubory a uloží je do PostgreSQL databáze `ordinace` do tabulek s prefixem `dasta_`. Vše čistě v Pythonu přes psycopg (v3). Skript je IDEMPOTENTNÍ: - databázi `ordinace` založí, jen pokud neexistuje - tabulky vytvoří přes CREATE TABLE IF NOT EXISTS - každý soubor se nahrává podle klíče = název souboru (bez přípony); při opakovaném běhu se zpráva UPSERTne a její výsledky/diagnózy se smažou a vloží znovu → výsledek je vždy stejný, žádné duplicity. Připojení se bere z Medevio/.env (PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB). Použití: python nahraj_do_postgres.py # zdroj = U:\\DASTA (výchozí) python nahraj_do_postgres.py D:\\jine\\dasta # jiný zdrojový adresář python nahraj_do_postgres.py U:\\DASTA --limit 50 # jen prvních 50 (test) python nahraj_do_postgres.py --recreate # zahodí dasta_ tabulky a založí znovu Tabulky: dasta_pacient (rodne_cislo PK) dasta_zprava (soubor PK) → pacient dasta_vysledek (id PK) → zprava [jednotlivé analyty] dasta_diagnoza (id PK) → zprava """ from __future__ import annotations import os import re import sys from datetime import date, datetime from pathlib import Path from xml.etree import ElementTree as ET import psycopg ZDROJ_VYCHOZI = Path(r"U:\DASTA") # --------------------------------------------------------------------------- # .env # --------------------------------------------------------------------------- def _load_env() -> None: env_path = Path(__file__).resolve().parent.parent / "Medevio" / ".env" if env_path.exists(): for line in env_path.read_text(encoding="utf-8").splitlines(): line = line.strip() if "=" in line and not line.startswith("#"): k, v = line.split("=", 1) os.environ[k.strip()] = v.strip() _load_env() PG = dict( host=os.environ.get("PG_HOST", "localhost"), port=os.environ.get("PG_PORT", "5432"), user=os.environ.get("PG_USER"), password=os.environ.get("PG_PASSWORD"), ) PG_DB = os.environ.get("PG_DB", "ordinace") # --------------------------------------------------------------------------- # Konverze hodnot # --------------------------------------------------------------------------- def num(s: str | None) -> float | None: """Český zápis čísla ('7,5') → float. Nečíselné vrací None.""" if s is None: return None t = s.strip().replace("\xa0", "").replace(" ", "").replace(",", ".") try: return float(t) except ValueError: return None def ts(s: str | None) -> datetime | None: """'2016-06-20T11:15:18' nebo '2017-05-18T07:30' → datetime.""" if not s: return None try: return datetime.fromisoformat(s.strip()) except ValueError: return None def dat(s: str | None) -> date | None: if not s: return None try: return date.fromisoformat(s.strip()[:10]) except ValueError: return None def _text(el, tag): if el is None: return None c = el.find(tag) return c.text.strip() if (c is not None and c.text) else None # --------------------------------------------------------------------------- # Schéma # --------------------------------------------------------------------------- DDL = """ CREATE TABLE IF NOT EXISTS dasta_pacient ( rodne_cislo text PRIMARY KEY, jmeno text, prijmeni text, dat_narozeni date, sex text ); CREATE TABLE IF NOT EXISTS dasta_zprava ( soubor text PRIMARY KEY, id_soubor text, ozn_soub text, dat_vytvoreni timestamp, verze_ds text, typ_odesm text, zdroj_prog text, zdroj_verze text, odesilatel_icp text, odesilatel_ico text, odesilatel_nazev text, prijemce_icp text, prijemce_nazev text, rodne_cislo text REFERENCES dasta_pacient(rodne_cislo) ); CREATE TABLE IF NOT EXISTS dasta_vysledek ( id bigserial PRIMARY KEY, soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE, klic_nclp text, nazev text, jednotka text, hodnota_raw text, hodnota_num double precision, dat_odber timestamp, dat_odber_typ text, dat_vydani timestamp, autor text, stav text, typ_kvant text, ref_low double precision, ref_high double precision, mimo_normu smallint ); CREATE TABLE IF NOT EXISTS dasta_diagnoza ( id bigserial PRIMARY KEY, soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE, poradi int, kod text ); CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_soubor ON dasta_vysledek(soubor); CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_nclp ON dasta_vysledek(klic_nclp); CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_odber ON dasta_vysledek(dat_odber); CREATE INDEX IF NOT EXISTS ix_dasta_zprava_rc ON dasta_zprava(rodne_cislo); """ DROP = """ DROP TABLE IF EXISTS dasta_vysledek CASCADE; DROP TABLE IF EXISTS dasta_diagnoza CASCADE; DROP TABLE IF EXISTS dasta_zprava CASCADE; DROP TABLE IF EXISTS dasta_pacient CASCADE; """ def ensure_database() -> None: """Založí DB `ordinace`, pokud neexistuje (mimo transakci).""" with psycopg.connect(dbname="postgres", autocommit=True, connect_timeout=10, **PG) as c: exists = c.execute( "SELECT 1 FROM pg_database WHERE datname = %s", (PG_DB,) ).fetchone() if not exists: # TEMPLATE template0 obchází collation version mismatch u template1 c.execute(f'CREATE DATABASE "{PG_DB}" TEMPLATE template0') print(f"Databáze {PG_DB} vytvořena.") else: print(f"Databáze {PG_DB} už existuje.") # --------------------------------------------------------------------------- # Parsování jednoho souboru → (pacient, zprava, vysledky, diagnozy) # --------------------------------------------------------------------------- _RE_ENC = re.compile(r"encoding=['\"][^'\"]+['\"]", re.I) def _nacti_root(raw: bytes): """Naparsuje XML; když selže (špatně deklarované kódování), zkusí UTF-8.""" try: return ET.fromstring(raw) except ET.ParseError: # Některé soubory deklarují Windows-1250, ale jsou v UTF-8. text = raw.decode("utf-8", errors="replace") text = _RE_ENC.sub("", text, count=1) # odstraň chybnou deklaraci return ET.fromstring(text) def parse_file(cesta: Path): root = _nacti_root(cesta.read_bytes()) soubor = cesta.stem zdroj = root.find("zdroj_is") pm = root.find("pm") is_el = root.find("is") pm_a = pm.find("a") if pm is not None else None is_a = is_el.find("a") if is_el is not None else None ip = is_el.find("ip") if is_el is not None else None # pacient rodne_cislo = _text(ip, "rodcis") if ip is not None else None pacient = None if rodne_cislo: pacient = ( rodne_cislo, _text(ip, "jmeno"), _text(ip, "prijmeni"), dat(_text(ip, "dat_dn")), _text(ip, "sex"), ) # pojišťovna se sem nedává (lze doplnit), držíme se zadaného rozsahu zprava = ( soubor, root.get("id_soubor"), root.get("ozn_soub"), ts(root.get("dat_vb")), root.get("verze_ds"), root.get("typ_odesm"), zdroj.get("kod_prog") if zdroj is not None else None, zdroj.get("verze_prog") if zdroj is not None else None, is_el.get("icp") if is_el is not None else None, is_el.get("ico") if is_el is not None else None, _text(is_a, "jmeno") if is_a is not None else None, pm.get("icp") if pm is not None else None, _text(pm_a, "jmeno") if pm_a is not None else None, rodne_cislo, ) vysledky = [] diagnozy = [] if ip is not None: dg = ip.find("dg") if dg is not None: for i, diag in enumerate(dg.iter("diag"), 1): if diag.text: diagnozy.append((soubor, i, diag.text.strip())) v = ip.find("v") if v is not None: for vr in v.findall("vr"): vrn = vr.find("vrn") nazvy = vrn.find("nazvy") if vrn is not None else None skala = vrn.find("skala") if vrn is not None else None dat_du = vr.find("dat_du") ref_low = ref_high = None if skala is not None: ref_low = num(_text(skala, "s4")) ref_high = num(_text(skala, "s5")) hodnota_raw = _text(vrn, "hodnota") if vrn is not None else None hodnota_num = num(hodnota_raw) mimo = None if hodnota_num is not None and (ref_low is not None or ref_high is not None): mimo = 0 if ref_low is not None and hodnota_num < ref_low: mimo = 1 if ref_high is not None and hodnota_num > ref_high: mimo = 1 vysledky.append(( soubor, vr.get("klic_nclp"), _text(vr, "nazev_lclp"), nazvy.get("jednotka") if nazvy is not None else None, hodnota_raw, hodnota_num, ts(dat_du.text if dat_du is not None else None), dat_du.get("typ") if dat_du is not None else None, ts(_text(vr, "dat_vv")), _text(vr, "autor"), vr.get("stav_vys"), vrn.get("priznak_kvant") if vrn is not None else None, ref_low, ref_high, mimo, )) return pacient, zprava, vysledky, diagnozy # --------------------------------------------------------------------------- # Zápis (idempotentní) # --------------------------------------------------------------------------- UPSERT_PACIENT = """ INSERT INTO dasta_pacient (rodne_cislo, jmeno, prijmeni, dat_narozeni, sex) VALUES (%s,%s,%s,%s,%s) ON CONFLICT (rodne_cislo) DO UPDATE SET jmeno=EXCLUDED.jmeno, prijmeni=EXCLUDED.prijmeni, dat_narozeni=EXCLUDED.dat_narozeni, sex=EXCLUDED.sex; """ UPSERT_ZPRAVA = """ INSERT INTO dasta_zprava (soubor,id_soubor,ozn_soub,dat_vytvoreni,verze_ds,typ_odesm, zdroj_prog,zdroj_verze,odesilatel_icp,odesilatel_ico,odesilatel_nazev, prijemce_icp,prijemce_nazev,rodne_cislo) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (soubor) DO UPDATE SET id_soubor=EXCLUDED.id_soubor, ozn_soub=EXCLUDED.ozn_soub, dat_vytvoreni=EXCLUDED.dat_vytvoreni, verze_ds=EXCLUDED.verze_ds, typ_odesm=EXCLUDED.typ_odesm, zdroj_prog=EXCLUDED.zdroj_prog, zdroj_verze=EXCLUDED.zdroj_verze, odesilatel_icp=EXCLUDED.odesilatel_icp, odesilatel_ico=EXCLUDED.odesilatel_ico, odesilatel_nazev=EXCLUDED.odesilatel_nazev, prijemce_icp=EXCLUDED.prijemce_icp, prijemce_nazev=EXCLUDED.prijemce_nazev, rodne_cislo=EXCLUDED.rodne_cislo; """ INS_VYSLEDEK = """ INSERT INTO dasta_vysledek (soubor,klic_nclp,nazev,jednotka,hodnota_raw,hodnota_num, dat_odber,dat_odber_typ,dat_vydani,autor,stav,typ_kvant,ref_low,ref_high,mimo_normu) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s); """ INS_DIAGNOZA = "INSERT INTO dasta_diagnoza (soubor,poradi,kod) VALUES (%s,%s,%s);" def main() -> None: args = sys.argv[1:] recreate = "--recreate" in args limit = None if "--limit" in args: limit = int(args[args.index("--limit") + 1]) pozicni = [a for a in args if not a.startswith("--")] # odfiltruj hodnotu za --limit if limit is not None and pozicni and pozicni[0] == str(limit): pozicni = pozicni[1:] zdroj = Path(pozicni[0]) if pozicni else ZDROJ_VYCHOZI print(f"Zdroj: {zdroj}") print(f"Cíl: postgresql://{PG['host']}:{PG['port']}/{PG_DB} (tabulky dasta_*)") ensure_database() soubory = sorted(zdroj.glob("*.xml")) if limit: soubory = soubory[:limit] print(f"Souborů ke zpracování: {len(soubory)}") print("-" * 60) ok = chyb = 0 chyby = [] # autocommit=True → každý soubor je samostatná transakce (conn.transaction), # takže chyba u jednoho souboru nikdy neovlivní ostatní. with psycopg.connect(dbname=PG_DB, autocommit=True, connect_timeout=10, **PG) as conn: with conn.cursor() as cur: if recreate: cur.execute(DROP) print("Tabulky dasta_* zahozeny.") cur.execute(DDL) cur = conn.cursor() for i, src in enumerate(soubory, 1): try: pacient, zprava, vysledky, diagnozy = parse_file(src) with conn.transaction(): # savepoint pro tento soubor if pacient: cur.execute(UPSERT_PACIENT, pacient) cur.execute(UPSERT_ZPRAVA, zprava) cur.execute("DELETE FROM dasta_vysledek WHERE soubor=%s", (src.stem,)) cur.execute("DELETE FROM dasta_diagnoza WHERE soubor=%s", (src.stem,)) if vysledky: cur.executemany(INS_VYSLEDEK, vysledky) if diagnozy: cur.executemany(INS_DIAGNOZA, diagnozy) ok += 1 except Exception as e: chyb += 1 chyby.append(f"{src.name}: {type(e).__name__} {e}") continue if i % 500 == 0: print(f" ... {i}/{len(soubory)}") print("-" * 60) print(f"Hotovo. Zpráv OK: {ok} Chyb: {chyb}") if chyby: print("Chyby:") for c in chyby[:20]: print(" " + c) # Souhrn with psycopg.connect(dbname=PG_DB, **PG) as conn: for t in ("dasta_pacient", "dasta_zprava", "dasta_vysledek", "dasta_diagnoza"): n = conn.execute(f"SELECT count(*) FROM {t}").fetchone()[0] print(f" {t:18}: {n}") if __name__ == "__main__": main()