Files
ordinaceprojekt/DASTA/nahraj_do_postgres.py
2026-06-15 16:10:24 +02:00

409 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""
Načte DASTA XML soubory a uloží je do PostgreSQL databáze `ordinace`
do tabulek s prefixem `dasta_`.
Vše čistě v Pythonu přes psycopg (v3). Skript je IDEMPOTENTNÍ:
- databázi `ordinace` založí, jen pokud neexistuje
- tabulky vytvoří přes CREATE TABLE IF NOT EXISTS
- každý soubor se nahrává podle klíče = název souboru (bez přípony);
při opakovaném běhu se zpráva UPSERTne a její výsledky/diagnózy
se smažou a vloží znovu → výsledek je vždy stejný, žádné duplicity.
Připojení se bere z Medevio/.env (PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB).
Použití:
python nahraj_do_postgres.py # zdroj = U:\\DASTA (výchozí)
python nahraj_do_postgres.py D:\\jine\\dasta # jiný zdrojový adresář
python nahraj_do_postgres.py U:\\DASTA --limit 50 # jen prvních 50 (test)
python nahraj_do_postgres.py --recreate # zahodí dasta_ tabulky a založí znovu
Tabulky:
dasta_pacient (rodne_cislo PK)
dasta_zprava (soubor PK) → pacient
dasta_vysledek (id PK) → zprava [jednotlivé analyty]
dasta_diagnoza (id PK) → zprava
"""
from __future__ import annotations
import os
import re
import sys
from datetime import date, datetime
from pathlib import Path
from xml.etree import ElementTree as ET
import psycopg
ZDROJ_VYCHOZI = Path(r"U:\DASTA")
# ---------------------------------------------------------------------------
# .env
# ---------------------------------------------------------------------------
def _load_env() -> None:
env_path = Path(__file__).resolve().parent.parent / "Medevio" / ".env"
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k.strip()] = v.strip()
_load_env()
PG = dict(
host=os.environ.get("PG_HOST", "localhost"),
port=os.environ.get("PG_PORT", "5432"),
user=os.environ.get("PG_USER"),
password=os.environ.get("PG_PASSWORD"),
)
PG_DB = os.environ.get("PG_DB", "ordinace")
# ---------------------------------------------------------------------------
# Konverze hodnot
# ---------------------------------------------------------------------------
def num(s: str | None) -> float | None:
"""Český zápis čísla ('7,5') → float. Nečíselné vrací None."""
if s is None:
return None
t = s.strip().replace("\xa0", "").replace(" ", "").replace(",", ".")
try:
return float(t)
except ValueError:
return None
def ts(s: str | None) -> datetime | None:
"""'2016-06-20T11:15:18' nebo '2017-05-18T07:30' → datetime."""
if not s:
return None
try:
return datetime.fromisoformat(s.strip())
except ValueError:
return None
def dat(s: str | None) -> date | None:
if not s:
return None
try:
return date.fromisoformat(s.strip()[:10])
except ValueError:
return None
def _text(el, tag):
if el is None:
return None
c = el.find(tag)
return c.text.strip() if (c is not None and c.text) else None
# ---------------------------------------------------------------------------
# Schéma
# ---------------------------------------------------------------------------
DDL = """
CREATE TABLE IF NOT EXISTS dasta_pacient (
rodne_cislo text PRIMARY KEY,
jmeno text,
prijmeni text,
dat_narozeni date,
sex text
);
CREATE TABLE IF NOT EXISTS dasta_zprava (
soubor text PRIMARY KEY,
id_soubor text,
ozn_soub text,
dat_vytvoreni timestamp,
verze_ds text,
typ_odesm text,
zdroj_prog text,
zdroj_verze text,
odesilatel_icp text,
odesilatel_ico text,
odesilatel_nazev text,
prijemce_icp text,
prijemce_nazev text,
rodne_cislo text REFERENCES dasta_pacient(rodne_cislo)
);
CREATE TABLE IF NOT EXISTS dasta_vysledek (
id bigserial PRIMARY KEY,
soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE,
klic_nclp text,
nazev text,
jednotka text,
hodnota_raw text,
hodnota_num double precision,
dat_odber timestamp,
dat_odber_typ text,
dat_vydani timestamp,
autor text,
stav text,
typ_kvant text,
ref_low double precision,
ref_high double precision,
mimo_normu smallint
);
CREATE TABLE IF NOT EXISTS dasta_diagnoza (
id bigserial PRIMARY KEY,
soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE,
poradi int,
kod text
);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_soubor ON dasta_vysledek(soubor);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_nclp ON dasta_vysledek(klic_nclp);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_odber ON dasta_vysledek(dat_odber);
CREATE INDEX IF NOT EXISTS ix_dasta_zprava_rc ON dasta_zprava(rodne_cislo);
"""
DROP = """
DROP TABLE IF EXISTS dasta_vysledek CASCADE;
DROP TABLE IF EXISTS dasta_diagnoza CASCADE;
DROP TABLE IF EXISTS dasta_zprava CASCADE;
DROP TABLE IF EXISTS dasta_pacient CASCADE;
"""
def ensure_database() -> None:
"""Založí DB `ordinace`, pokud neexistuje (mimo transakci)."""
with psycopg.connect(dbname="postgres", autocommit=True, connect_timeout=10, **PG) as c:
exists = c.execute(
"SELECT 1 FROM pg_database WHERE datname = %s", (PG_DB,)
).fetchone()
if not exists:
# TEMPLATE template0 obchází collation version mismatch u template1
c.execute(f'CREATE DATABASE "{PG_DB}" TEMPLATE template0')
print(f"Databáze {PG_DB} vytvořena.")
else:
print(f"Databáze {PG_DB} už existuje.")
# ---------------------------------------------------------------------------
# Parsování jednoho souboru → (pacient, zprava, vysledky, diagnozy)
# ---------------------------------------------------------------------------
_RE_ENC = re.compile(r"encoding=['\"][^'\"]+['\"]", re.I)
def _nacti_root(raw: bytes):
"""Naparsuje XML; když selže (špatně deklarované kódování), zkusí UTF-8."""
try:
return ET.fromstring(raw)
except ET.ParseError:
# Některé soubory deklarují Windows-1250, ale jsou v UTF-8.
text = raw.decode("utf-8", errors="replace")
text = _RE_ENC.sub("", text, count=1) # odstraň chybnou deklaraci
return ET.fromstring(text)
def parse_file(cesta: Path):
root = _nacti_root(cesta.read_bytes())
soubor = cesta.stem
zdroj = root.find("zdroj_is")
pm = root.find("pm")
is_el = root.find("is")
pm_a = pm.find("a") if pm is not None else None
is_a = is_el.find("a") if is_el is not None else None
ip = is_el.find("ip") if is_el is not None else None
# pacient
rodne_cislo = _text(ip, "rodcis") if ip is not None else None
pacient = None
if rodne_cislo:
pacient = (
rodne_cislo,
_text(ip, "jmeno"),
_text(ip, "prijmeni"),
dat(_text(ip, "dat_dn")),
_text(ip, "sex"),
)
# pojišťovna se sem nedává (lze doplnit), držíme se zadaného rozsahu
zprava = (
soubor,
root.get("id_soubor"),
root.get("ozn_soub"),
ts(root.get("dat_vb")),
root.get("verze_ds"),
root.get("typ_odesm"),
zdroj.get("kod_prog") if zdroj is not None else None,
zdroj.get("verze_prog") if zdroj is not None else None,
is_el.get("icp") if is_el is not None else None,
is_el.get("ico") if is_el is not None else None,
_text(is_a, "jmeno") if is_a is not None else None,
pm.get("icp") if pm is not None else None,
_text(pm_a, "jmeno") if pm_a is not None else None,
rodne_cislo,
)
vysledky = []
diagnozy = []
if ip is not None:
dg = ip.find("dg")
if dg is not None:
for i, diag in enumerate(dg.iter("diag"), 1):
if diag.text:
diagnozy.append((soubor, i, diag.text.strip()))
v = ip.find("v")
if v is not None:
for vr in v.findall("vr"):
vrn = vr.find("vrn")
nazvy = vrn.find("nazvy") if vrn is not None else None
skala = vrn.find("skala") if vrn is not None else None
dat_du = vr.find("dat_du")
ref_low = ref_high = None
if skala is not None:
ref_low = num(_text(skala, "s4"))
ref_high = num(_text(skala, "s5"))
hodnota_raw = _text(vrn, "hodnota") if vrn is not None else None
hodnota_num = num(hodnota_raw)
mimo = None
if hodnota_num is not None and (ref_low is not None or ref_high is not None):
mimo = 0
if ref_low is not None and hodnota_num < ref_low:
mimo = 1
if ref_high is not None and hodnota_num > ref_high:
mimo = 1
vysledky.append((
soubor,
vr.get("klic_nclp"),
_text(vr, "nazev_lclp"),
nazvy.get("jednotka") if nazvy is not None else None,
hodnota_raw,
hodnota_num,
ts(dat_du.text if dat_du is not None else None),
dat_du.get("typ") if dat_du is not None else None,
ts(_text(vr, "dat_vv")),
_text(vr, "autor"),
vr.get("stav_vys"),
vrn.get("priznak_kvant") if vrn is not None else None,
ref_low,
ref_high,
mimo,
))
return pacient, zprava, vysledky, diagnozy
# ---------------------------------------------------------------------------
# Zápis (idempotentní)
# ---------------------------------------------------------------------------
UPSERT_PACIENT = """
INSERT INTO dasta_pacient (rodne_cislo, jmeno, prijmeni, dat_narozeni, sex)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (rodne_cislo) DO UPDATE SET
jmeno=EXCLUDED.jmeno, prijmeni=EXCLUDED.prijmeni,
dat_narozeni=EXCLUDED.dat_narozeni, sex=EXCLUDED.sex;
"""
UPSERT_ZPRAVA = """
INSERT INTO dasta_zprava (soubor,id_soubor,ozn_soub,dat_vytvoreni,verze_ds,typ_odesm,
zdroj_prog,zdroj_verze,odesilatel_icp,odesilatel_ico,odesilatel_nazev,
prijemce_icp,prijemce_nazev,rodne_cislo)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (soubor) DO UPDATE SET
id_soubor=EXCLUDED.id_soubor, ozn_soub=EXCLUDED.ozn_soub,
dat_vytvoreni=EXCLUDED.dat_vytvoreni, verze_ds=EXCLUDED.verze_ds,
typ_odesm=EXCLUDED.typ_odesm, zdroj_prog=EXCLUDED.zdroj_prog,
zdroj_verze=EXCLUDED.zdroj_verze, odesilatel_icp=EXCLUDED.odesilatel_icp,
odesilatel_ico=EXCLUDED.odesilatel_ico, odesilatel_nazev=EXCLUDED.odesilatel_nazev,
prijemce_icp=EXCLUDED.prijemce_icp, prijemce_nazev=EXCLUDED.prijemce_nazev,
rodne_cislo=EXCLUDED.rodne_cislo;
"""
INS_VYSLEDEK = """
INSERT INTO dasta_vysledek (soubor,klic_nclp,nazev,jednotka,hodnota_raw,hodnota_num,
dat_odber,dat_odber_typ,dat_vydani,autor,stav,typ_kvant,ref_low,ref_high,mimo_normu)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
"""
INS_DIAGNOZA = "INSERT INTO dasta_diagnoza (soubor,poradi,kod) VALUES (%s,%s,%s);"
def main() -> None:
args = sys.argv[1:]
recreate = "--recreate" in args
limit = None
if "--limit" in args:
limit = int(args[args.index("--limit") + 1])
pozicni = [a for a in args if not a.startswith("--")]
# odfiltruj hodnotu za --limit
if limit is not None and pozicni and pozicni[0] == str(limit):
pozicni = pozicni[1:]
zdroj = Path(pozicni[0]) if pozicni else ZDROJ_VYCHOZI
print(f"Zdroj: {zdroj}")
print(f"Cíl: postgresql://{PG['host']}:{PG['port']}/{PG_DB} (tabulky dasta_*)")
ensure_database()
soubory = sorted(zdroj.glob("*.xml"))
if limit:
soubory = soubory[:limit]
print(f"Souborů ke zpracování: {len(soubory)}")
print("-" * 60)
ok = chyb = 0
chyby = []
# autocommit=True → každý soubor je samostatná transakce (conn.transaction),
# takže chyba u jednoho souboru nikdy neovlivní ostatní.
with psycopg.connect(dbname=PG_DB, autocommit=True, connect_timeout=10, **PG) as conn:
with conn.cursor() as cur:
if recreate:
cur.execute(DROP)
print("Tabulky dasta_* zahozeny.")
cur.execute(DDL)
cur = conn.cursor()
for i, src in enumerate(soubory, 1):
try:
pacient, zprava, vysledky, diagnozy = parse_file(src)
with conn.transaction(): # savepoint pro tento soubor
if pacient:
cur.execute(UPSERT_PACIENT, pacient)
cur.execute(UPSERT_ZPRAVA, zprava)
cur.execute("DELETE FROM dasta_vysledek WHERE soubor=%s", (src.stem,))
cur.execute("DELETE FROM dasta_diagnoza WHERE soubor=%s", (src.stem,))
if vysledky:
cur.executemany(INS_VYSLEDEK, vysledky)
if diagnozy:
cur.executemany(INS_DIAGNOZA, diagnozy)
ok += 1
except Exception as e:
chyb += 1
chyby.append(f"{src.name}: {type(e).__name__} {e}")
continue
if i % 500 == 0:
print(f" ... {i}/{len(soubory)}")
print("-" * 60)
print(f"Hotovo. Zpráv OK: {ok} Chyb: {chyb}")
if chyby:
print("Chyby:")
for c in chyby[:20]:
print(" " + c)
# Souhrn
with psycopg.connect(dbname=PG_DB, **PG) as conn:
for t in ("dasta_pacient", "dasta_zprava", "dasta_vysledek", "dasta_diagnoza"):
n = conn.execute(f"SELECT count(*) FROM {t}").fetchone()[0]
print(f" {t:18}: {n}")
if __name__ == "__main__":
main()