This commit is contained in:
2026-06-15 16:10:24 +02:00
parent 2bdac59676
commit 8142de5216
4 changed files with 790 additions and 1 deletions
+408
View File
@@ -0,0 +1,408 @@
# -*- coding: utf-8 -*-
"""
Načte DASTA XML soubory a uloží je do PostgreSQL databáze `ordinace`
do tabulek s prefixem `dasta_`.
Vše čistě v Pythonu přes psycopg (v3). Skript je IDEMPOTENTNÍ:
- databázi `ordinace` založí, jen pokud neexistuje
- tabulky vytvoří přes CREATE TABLE IF NOT EXISTS
- každý soubor se nahrává podle klíče = název souboru (bez přípony);
při opakovaném běhu se zpráva UPSERTne a její výsledky/diagnózy
se smažou a vloží znovu → výsledek je vždy stejný, žádné duplicity.
Připojení se bere z Medevio/.env (PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB).
Použití:
python nahraj_do_postgres.py # zdroj = U:\\DASTA (výchozí)
python nahraj_do_postgres.py D:\\jine\\dasta # jiný zdrojový adresář
python nahraj_do_postgres.py U:\\DASTA --limit 50 # jen prvních 50 (test)
python nahraj_do_postgres.py --recreate # zahodí dasta_ tabulky a založí znovu
Tabulky:
dasta_pacient (rodne_cislo PK)
dasta_zprava (soubor PK) → pacient
dasta_vysledek (id PK) → zprava [jednotlivé analyty]
dasta_diagnoza (id PK) → zprava
"""
from __future__ import annotations
import os
import re
import sys
from datetime import date, datetime
from pathlib import Path
from xml.etree import ElementTree as ET
import psycopg
ZDROJ_VYCHOZI = Path(r"U:\DASTA")
# ---------------------------------------------------------------------------
# .env
# ---------------------------------------------------------------------------
def _load_env() -> None:
env_path = Path(__file__).resolve().parent.parent / "Medevio" / ".env"
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k.strip()] = v.strip()
_load_env()
PG = dict(
host=os.environ.get("PG_HOST", "localhost"),
port=os.environ.get("PG_PORT", "5432"),
user=os.environ.get("PG_USER"),
password=os.environ.get("PG_PASSWORD"),
)
PG_DB = os.environ.get("PG_DB", "ordinace")
# ---------------------------------------------------------------------------
# Konverze hodnot
# ---------------------------------------------------------------------------
def num(s: str | None) -> float | None:
"""Český zápis čísla ('7,5') → float. Nečíselné vrací None."""
if s is None:
return None
t = s.strip().replace("\xa0", "").replace(" ", "").replace(",", ".")
try:
return float(t)
except ValueError:
return None
def ts(s: str | None) -> datetime | None:
"""'2016-06-20T11:15:18' nebo '2017-05-18T07:30' → datetime."""
if not s:
return None
try:
return datetime.fromisoformat(s.strip())
except ValueError:
return None
def dat(s: str | None) -> date | None:
if not s:
return None
try:
return date.fromisoformat(s.strip()[:10])
except ValueError:
return None
def _text(el, tag):
if el is None:
return None
c = el.find(tag)
return c.text.strip() if (c is not None and c.text) else None
# ---------------------------------------------------------------------------
# Schéma
# ---------------------------------------------------------------------------
DDL = """
CREATE TABLE IF NOT EXISTS dasta_pacient (
rodne_cislo text PRIMARY KEY,
jmeno text,
prijmeni text,
dat_narozeni date,
sex text
);
CREATE TABLE IF NOT EXISTS dasta_zprava (
soubor text PRIMARY KEY,
id_soubor text,
ozn_soub text,
dat_vytvoreni timestamp,
verze_ds text,
typ_odesm text,
zdroj_prog text,
zdroj_verze text,
odesilatel_icp text,
odesilatel_ico text,
odesilatel_nazev text,
prijemce_icp text,
prijemce_nazev text,
rodne_cislo text REFERENCES dasta_pacient(rodne_cislo)
);
CREATE TABLE IF NOT EXISTS dasta_vysledek (
id bigserial PRIMARY KEY,
soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE,
klic_nclp text,
nazev text,
jednotka text,
hodnota_raw text,
hodnota_num double precision,
dat_odber timestamp,
dat_odber_typ text,
dat_vydani timestamp,
autor text,
stav text,
typ_kvant text,
ref_low double precision,
ref_high double precision,
mimo_normu smallint
);
CREATE TABLE IF NOT EXISTS dasta_diagnoza (
id bigserial PRIMARY KEY,
soubor text NOT NULL REFERENCES dasta_zprava(soubor) ON DELETE CASCADE,
poradi int,
kod text
);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_soubor ON dasta_vysledek(soubor);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_nclp ON dasta_vysledek(klic_nclp);
CREATE INDEX IF NOT EXISTS ix_dasta_vysledek_odber ON dasta_vysledek(dat_odber);
CREATE INDEX IF NOT EXISTS ix_dasta_zprava_rc ON dasta_zprava(rodne_cislo);
"""
DROP = """
DROP TABLE IF EXISTS dasta_vysledek CASCADE;
DROP TABLE IF EXISTS dasta_diagnoza CASCADE;
DROP TABLE IF EXISTS dasta_zprava CASCADE;
DROP TABLE IF EXISTS dasta_pacient CASCADE;
"""
def ensure_database() -> None:
"""Založí DB `ordinace`, pokud neexistuje (mimo transakci)."""
with psycopg.connect(dbname="postgres", autocommit=True, connect_timeout=10, **PG) as c:
exists = c.execute(
"SELECT 1 FROM pg_database WHERE datname = %s", (PG_DB,)
).fetchone()
if not exists:
# TEMPLATE template0 obchází collation version mismatch u template1
c.execute(f'CREATE DATABASE "{PG_DB}" TEMPLATE template0')
print(f"Databáze {PG_DB} vytvořena.")
else:
print(f"Databáze {PG_DB} už existuje.")
# ---------------------------------------------------------------------------
# Parsování jednoho souboru → (pacient, zprava, vysledky, diagnozy)
# ---------------------------------------------------------------------------
_RE_ENC = re.compile(r"encoding=['\"][^'\"]+['\"]", re.I)
def _nacti_root(raw: bytes):
"""Naparsuje XML; když selže (špatně deklarované kódování), zkusí UTF-8."""
try:
return ET.fromstring(raw)
except ET.ParseError:
# Některé soubory deklarují Windows-1250, ale jsou v UTF-8.
text = raw.decode("utf-8", errors="replace")
text = _RE_ENC.sub("", text, count=1) # odstraň chybnou deklaraci
return ET.fromstring(text)
def parse_file(cesta: Path):
root = _nacti_root(cesta.read_bytes())
soubor = cesta.stem
zdroj = root.find("zdroj_is")
pm = root.find("pm")
is_el = root.find("is")
pm_a = pm.find("a") if pm is not None else None
is_a = is_el.find("a") if is_el is not None else None
ip = is_el.find("ip") if is_el is not None else None
# pacient
rodne_cislo = _text(ip, "rodcis") if ip is not None else None
pacient = None
if rodne_cislo:
pacient = (
rodne_cislo,
_text(ip, "jmeno"),
_text(ip, "prijmeni"),
dat(_text(ip, "dat_dn")),
_text(ip, "sex"),
)
# pojišťovna se sem nedává (lze doplnit), držíme se zadaného rozsahu
zprava = (
soubor,
root.get("id_soubor"),
root.get("ozn_soub"),
ts(root.get("dat_vb")),
root.get("verze_ds"),
root.get("typ_odesm"),
zdroj.get("kod_prog") if zdroj is not None else None,
zdroj.get("verze_prog") if zdroj is not None else None,
is_el.get("icp") if is_el is not None else None,
is_el.get("ico") if is_el is not None else None,
_text(is_a, "jmeno") if is_a is not None else None,
pm.get("icp") if pm is not None else None,
_text(pm_a, "jmeno") if pm_a is not None else None,
rodne_cislo,
)
vysledky = []
diagnozy = []
if ip is not None:
dg = ip.find("dg")
if dg is not None:
for i, diag in enumerate(dg.iter("diag"), 1):
if diag.text:
diagnozy.append((soubor, i, diag.text.strip()))
v = ip.find("v")
if v is not None:
for vr in v.findall("vr"):
vrn = vr.find("vrn")
nazvy = vrn.find("nazvy") if vrn is not None else None
skala = vrn.find("skala") if vrn is not None else None
dat_du = vr.find("dat_du")
ref_low = ref_high = None
if skala is not None:
ref_low = num(_text(skala, "s4"))
ref_high = num(_text(skala, "s5"))
hodnota_raw = _text(vrn, "hodnota") if vrn is not None else None
hodnota_num = num(hodnota_raw)
mimo = None
if hodnota_num is not None and (ref_low is not None or ref_high is not None):
mimo = 0
if ref_low is not None and hodnota_num < ref_low:
mimo = 1
if ref_high is not None and hodnota_num > ref_high:
mimo = 1
vysledky.append((
soubor,
vr.get("klic_nclp"),
_text(vr, "nazev_lclp"),
nazvy.get("jednotka") if nazvy is not None else None,
hodnota_raw,
hodnota_num,
ts(dat_du.text if dat_du is not None else None),
dat_du.get("typ") if dat_du is not None else None,
ts(_text(vr, "dat_vv")),
_text(vr, "autor"),
vr.get("stav_vys"),
vrn.get("priznak_kvant") if vrn is not None else None,
ref_low,
ref_high,
mimo,
))
return pacient, zprava, vysledky, diagnozy
# ---------------------------------------------------------------------------
# Zápis (idempotentní)
# ---------------------------------------------------------------------------
UPSERT_PACIENT = """
INSERT INTO dasta_pacient (rodne_cislo, jmeno, prijmeni, dat_narozeni, sex)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT (rodne_cislo) DO UPDATE SET
jmeno=EXCLUDED.jmeno, prijmeni=EXCLUDED.prijmeni,
dat_narozeni=EXCLUDED.dat_narozeni, sex=EXCLUDED.sex;
"""
UPSERT_ZPRAVA = """
INSERT INTO dasta_zprava (soubor,id_soubor,ozn_soub,dat_vytvoreni,verze_ds,typ_odesm,
zdroj_prog,zdroj_verze,odesilatel_icp,odesilatel_ico,odesilatel_nazev,
prijemce_icp,prijemce_nazev,rodne_cislo)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (soubor) DO UPDATE SET
id_soubor=EXCLUDED.id_soubor, ozn_soub=EXCLUDED.ozn_soub,
dat_vytvoreni=EXCLUDED.dat_vytvoreni, verze_ds=EXCLUDED.verze_ds,
typ_odesm=EXCLUDED.typ_odesm, zdroj_prog=EXCLUDED.zdroj_prog,
zdroj_verze=EXCLUDED.zdroj_verze, odesilatel_icp=EXCLUDED.odesilatel_icp,
odesilatel_ico=EXCLUDED.odesilatel_ico, odesilatel_nazev=EXCLUDED.odesilatel_nazev,
prijemce_icp=EXCLUDED.prijemce_icp, prijemce_nazev=EXCLUDED.prijemce_nazev,
rodne_cislo=EXCLUDED.rodne_cislo;
"""
INS_VYSLEDEK = """
INSERT INTO dasta_vysledek (soubor,klic_nclp,nazev,jednotka,hodnota_raw,hodnota_num,
dat_odber,dat_odber_typ,dat_vydani,autor,stav,typ_kvant,ref_low,ref_high,mimo_normu)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
"""
INS_DIAGNOZA = "INSERT INTO dasta_diagnoza (soubor,poradi,kod) VALUES (%s,%s,%s);"
def main() -> None:
args = sys.argv[1:]
recreate = "--recreate" in args
limit = None
if "--limit" in args:
limit = int(args[args.index("--limit") + 1])
pozicni = [a for a in args if not a.startswith("--")]
# odfiltruj hodnotu za --limit
if limit is not None and pozicni and pozicni[0] == str(limit):
pozicni = pozicni[1:]
zdroj = Path(pozicni[0]) if pozicni else ZDROJ_VYCHOZI
print(f"Zdroj: {zdroj}")
print(f"Cíl: postgresql://{PG['host']}:{PG['port']}/{PG_DB} (tabulky dasta_*)")
ensure_database()
soubory = sorted(zdroj.glob("*.xml"))
if limit:
soubory = soubory[:limit]
print(f"Souborů ke zpracování: {len(soubory)}")
print("-" * 60)
ok = chyb = 0
chyby = []
# autocommit=True → každý soubor je samostatná transakce (conn.transaction),
# takže chyba u jednoho souboru nikdy neovlivní ostatní.
with psycopg.connect(dbname=PG_DB, autocommit=True, connect_timeout=10, **PG) as conn:
with conn.cursor() as cur:
if recreate:
cur.execute(DROP)
print("Tabulky dasta_* zahozeny.")
cur.execute(DDL)
cur = conn.cursor()
for i, src in enumerate(soubory, 1):
try:
pacient, zprava, vysledky, diagnozy = parse_file(src)
with conn.transaction(): # savepoint pro tento soubor
if pacient:
cur.execute(UPSERT_PACIENT, pacient)
cur.execute(UPSERT_ZPRAVA, zprava)
cur.execute("DELETE FROM dasta_vysledek WHERE soubor=%s", (src.stem,))
cur.execute("DELETE FROM dasta_diagnoza WHERE soubor=%s", (src.stem,))
if vysledky:
cur.executemany(INS_VYSLEDEK, vysledky)
if diagnozy:
cur.executemany(INS_DIAGNOZA, diagnozy)
ok += 1
except Exception as e:
chyb += 1
chyby.append(f"{src.name}: {type(e).__name__} {e}")
continue
if i % 500 == 0:
print(f" ... {i}/{len(soubory)}")
print("-" * 60)
print(f"Hotovo. Zpráv OK: {ok} Chyb: {chyb}")
if chyby:
print("Chyby:")
for c in chyby[:20]:
print(" " + c)
# Souhrn
with psycopg.connect(dbname=PG_DB, **PG) as conn:
for t in ("dasta_pacient", "dasta_zprava", "dasta_vysledek", "dasta_diagnoza"):
n = conn.execute(f"SELECT count(*) FROM {t}").fetchone()[0]
print(f" {t:18}: {n}")
if __name__ == "__main__":
main()
+260
View File
@@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
"""
Parser DASTA XML (Datový standard MZ ČR, verze DS 03.01.01).
Rozebírá laboratorní zprávy (typ odesílatele LB) do strukturovaných dat.
Soubory jsou v kódování windows-1250 (uvedeno v XML deklaraci) a obsahují
DOCTYPE odkaz na lokální DTD (ds030101.dtd), který při parsování ignorujeme.
Struktura DASTA dávky (zjednodušeně):
dasta kořen, hlavička dávky (datum, verze, odesílatel)
zdroj_is informační systém, který dávku vytvořil
pm (icp) příjemce zprávy (ordinace) + adresa (a typ="P")
is (icp, ico) odesílatel = laboratoř + adresa (a typ="O")
ip (id_pac) pacient: rodné číslo, jméno, dat. narození, sex
pv / p pojišťovna (kodpoj, typpoj)
dg / dgz / diag diagnózy
v blok výsledků
vr (klic_nclp...) jeden laboratorní výsledek (analyt)
nazev_lclp název položky (WBC, RBC, ...)
dat_du/dat_pl/dat_vv odběr / příjem / vydání výsledku
autor validující lékař
vrn číselný výsledek
hodnota naměřená hodnota
nazvy@jednotka měrná jednotka
skala s1..s8 referenční pásma (meze)
interpret_g_z grafická interpretace | | * | |
"""
from __future__ import annotations
import sys
from dataclasses import dataclass, field, asdict
from pathlib import Path
from xml.etree import ElementTree as ET
# ---------------------------------------------------------------------------
# Datové třídy
# ---------------------------------------------------------------------------
@dataclass
class Vysledek:
klic_nclp: str # kód NČLP (Národní číselník laboratorních položek)
nazev: str # lokální název položky (WBC, RBC, ...)
hodnota: str | None
jednotka: str | None
dat_odber: str | None # dat_du datum a čas odběru
dat_vydani: str | None # dat_vv datum a čas vydání výsledku
autor: str | None
stav: str | None # stav_vys (A = definitivní)
typ_kvant: str | None # priznak_kvant (R = reálné číslo)
interpret: str | None # grafická interpretace mezí
meze: list[str] = field(default_factory=list) # s1..s8
@property
def referencni_mez(self) -> str | None:
"""Klinicky relevantní referenční rozmezí = s4 (dolní) až s5 (horní)."""
if len(self.meze) >= 5:
return f"{self.meze[3]} {self.meze[4]}"
return None
@dataclass
class Pacient:
id_pac: str | None
rodne_cislo: str | None
jmeno: str | None
prijmeni: str | None
datum_narozeni: str | None
sex: str | None
pojistovna: str | None
typ_pojisteni: str | None
diagnozy: list[str] = field(default_factory=list)
vysledky: list[Vysledek] = field(default_factory=list)
@dataclass
class Adresa:
jmeno: str | None = None
radky: list[str] = field(default_factory=list)
psc: str | None = None
mesto: str | None = None
@dataclass
class DastaZprava:
ozn_soub: str | None
id_soubor: str | None
datum_vytvoreni: str | None
verze_ds: str | None
typ_odesilatele: str | None
zdroj_program: str | None
zdroj_verze: str | None
prijemce_icp: str | None
prijemce: Adresa | None
odesilatel_icp: str | None
odesilatel_ico: str | None
odesilatel: Adresa | None
pacienti: list[Pacient] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Pomocné funkce
# ---------------------------------------------------------------------------
def _text(el, tag: str) -> str | None:
"""Vrátí text potomka `tag` nebo None."""
if el is None:
return None
child = el.find(tag)
return child.text.strip() if (child is not None and child.text) else None
def _parse_adresa(el) -> Adresa | None:
if el is None:
return None
a = el.find("a")
if a is None:
return None
radky = [v for v in (_text(a, "adr"), _text(a, "dop1"), _text(a, "dop2")) if v]
return Adresa(
jmeno=_text(a, "jmeno"),
radky=radky,
psc=_text(a, "psc"),
mesto=_text(a, "mesto"),
)
# ---------------------------------------------------------------------------
# Hlavní parser
# ---------------------------------------------------------------------------
def parse_dasta(cesta: str | Path) -> DastaZprava:
cesta = Path(cesta)
# XML obsahuje DOCTYPE s odkazem na DTD; vypneme načítání externích entit
# tím, že parsujeme bez resolveru. ElementTree DTD ignoruje automaticky.
raw = cesta.read_bytes()
# ElementTree si kódování přečte z XML deklarace (<?xml ... encoding=...?>).
root = ET.fromstring(raw)
zdroj = root.find("zdroj_is")
pm = root.find("pm")
is_el = root.find("is")
zprava = DastaZprava(
ozn_soub=root.get("ozn_soub"),
id_soubor=root.get("id_soubor"),
datum_vytvoreni=root.get("dat_vb"),
verze_ds=root.get("verze_ds"),
typ_odesilatele=root.get("typ_odesm"),
zdroj_program=zdroj.get("kod_prog") if zdroj is not None else None,
zdroj_verze=zdroj.get("verze_prog") if zdroj is not None else None,
prijemce_icp=pm.get("icp") if pm is not None else None,
prijemce=_parse_adresa(pm),
odesilatel_icp=is_el.get("icp") if is_el is not None else None,
odesilatel_ico=is_el.get("ico") if is_el is not None else None,
odesilatel=_parse_adresa(is_el),
)
if is_el is None:
return zprava
for ip in is_el.findall("ip"):
# pojišťovna bere se z elementu <p> (přímo nebo uvnitř <pv>)
p = ip.find("p")
if p is None:
pv = ip.find("pv")
p = pv.find("p") if pv is not None else None
pacient = Pacient(
id_pac=ip.get("id_pac"),
rodne_cislo=_text(ip, "rodcis"),
jmeno=_text(ip, "jmeno"),
prijmeni=_text(ip, "prijmeni"),
datum_narozeni=_text(ip, "dat_dn"),
sex=_text(ip, "sex"),
pojistovna=_text(p, "kodpoj") if p is not None else None,
typ_pojisteni=_text(p, "typpoj") if p is not None else None,
)
# diagnózy
dg = ip.find("dg")
if dg is not None:
for diag in dg.iter("diag"):
if diag.text:
pacient.diagnozy.append(diag.text.strip())
# výsledky
v = ip.find("v")
if v is not None:
for vr in v.findall("vr"):
vrn = vr.find("vrn")
nazvy = vrn.find("nazvy") if vrn is not None else None
skala = vrn.find("skala") if vrn is not None else None
meze = []
interpret = None
if skala is not None:
for i in range(1, 9):
meze.append(_text(skala, f"s{i}") or "")
interpret = _text(skala, "interpret_g_z")
pacient.vysledky.append(Vysledek(
klic_nclp=vr.get("klic_nclp"),
nazev=_text(vr, "nazev_lclp"),
hodnota=_text(vrn, "hodnota") if vrn is not None else None,
jednotka=nazvy.get("jednotka") if nazvy is not None else None,
dat_odber=_text(vr, "dat_du"),
dat_vydani=_text(vr, "dat_vv"),
autor=_text(vr, "autor"),
stav=vr.get("stav_vys"),
typ_kvant=vrn.get("priznak_kvant") if vrn is not None else None,
interpret=interpret,
meze=meze,
))
zprava.pacienti.append(pacient)
return zprava
# ---------------------------------------------------------------------------
# Výpis přehledu
# ---------------------------------------------------------------------------
def vypis_prehled(z: DastaZprava) -> None:
print("=" * 70)
print(f"DASTA dávka {z.ozn_soub} (DS {z.verze_ds}, typ {z.typ_odesilatele})")
print(f"Vytvořeno: {z.datum_vytvoreni}")
print(f"Program: {z.zdroj_program} {z.zdroj_verze}")
print(f"ID souboru: {z.id_soubor}")
print("-" * 70)
if z.odesilatel:
print(f"Odesílatel (laboratoř) IČP {z.odesilatel_icp} IČO {z.odesilatel_ico}")
print(f" {z.odesilatel.jmeno} | {', '.join(z.odesilatel.radky)}")
print(f" {z.odesilatel.psc} {z.odesilatel.mesto}")
if z.prijemce:
print(f"Příjemce (ordinace) IČP {z.prijemce_icp}")
print(f" {z.prijemce.jmeno} | {', '.join(z.prijemce.radky)}")
print("=" * 70)
for pac in z.pacienti:
print(f"\nPacient: {pac.prijmeni} {pac.jmeno} r.č. {pac.rodne_cislo}"
f" nar. {pac.datum_narozeni} {pac.sex}")
print(f" Pojišťovna {pac.pojistovna} (typ {pac.typ_pojisteni})"
f" Diagnózy: {', '.join(pac.diagnozy) or ''}")
print(f" Výsledků: {len(pac.vysledky)}")
print()
print(f" {'Položka':<14}{'Hodnota':>10} {'Jedn.':<9}"
f"{'Ref. mez':<18}{'Interpr.':<12}NČLP")
print(" " + "-" * 75)
for vys in pac.vysledky:
ref = vys.referencni_mez or ""
interp = (vys.interpret or "").strip()
print(f" {vys.nazev or '':<14}{vys.hodnota or '':>10} "
f"{vys.jednotka or '':<9}{ref:<18}{interp:<12}{vys.klic_nclp}")
if __name__ == "__main__":
if len(sys.argv) > 1:
cesta = sys.argv[1]
else:
cesta = r"u:\Dropbox\Ordinace\pomoc\DASTA\RLB05E6T.xml"
zprava = parse_dasta(cesta)
vypis_prehled(zprava)
+114
View File
@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""
Roztřídí DASTA XML soubory do adresářové struktury podle DATA ODBĚRU.
Zdroj: u:\\Dropbox\\Ordinace\\pomoc\\DASTA\\*.xml
Cíl: U:\\DASTA_SUBGROUPS\\RRRR\\MM\\DD\\<soubor>.xml (kopie, originál zůstává)
Datum odběru = první element <dat_du> v souboru (první potomek prvního <vr>).
Hodnota má formát DTS (2016-06-20T08:00:00) nebo DT (2017-05-18T07:30)
v obou případech začíná YYYY-MM-DD, takže rok/měsíc/den čteme z prvních znaků.
Speciální případy:
_BEZ_DATUMU soubor neobsahuje žádný <dat_du>
_CHYBY soubor se nepodařilo naparsovat
Použití:
python roztrid_dle_odberu.py # zdroj = výchozí (Dropbox)
python roztrid_dle_odberu.py U:\\DASTA # jiný zdrojový adresář
python roztrid_dle_odberu.py U:\\DASTA --dry-run # jen vypíše, co by udělal
"""
from __future__ import annotations
import re
import shutil
import sys
from collections import Counter
from pathlib import Path
from xml.etree import ElementTree as ET
ZDROJ_VYCHOZI = Path(r"u:\dasta")
CIL = Path(r"U:\DASTA_SUBGROUPS")
# Záchytný regex pro případ, že ElementTree selže (poškozená hlavička apod.)
_RE_DAT_DU = re.compile(rb"<dat_du[^>]*>\s*(\d{4})-(\d{2})-(\d{2})")
def datum_odberu(cesta: Path) -> tuple[str, str, str] | None:
"""Vrátí (rok, měsíc, den) z prvního <dat_du>, nebo None když chybí."""
raw = cesta.read_bytes()
# Rychlá a robustní cesta: najdi první <dat_du> v bytech.
m = _RE_DAT_DU.search(raw)
if m:
return m.group(1).decode(), m.group(2).decode(), m.group(3).decode()
# Záloha přes ElementTree (kdyby byl dat_du formátovaný jinak)
try:
root = ET.fromstring(raw)
el = root.find(".//dat_du")
if el is not None and el.text:
d = el.text.strip()
return d[0:4], d[5:7], d[8:10]
except ET.ParseError:
raise
return None
def main() -> None:
dry = "--dry-run" in sys.argv
pozicni = [a for a in sys.argv[1:] if not a.startswith("--")]
zdroj = Path(pozicni[0]) if pozicni else ZDROJ_VYCHOZI
soubory = sorted(zdroj.glob("*.xml"))
print(f"Zdroj: {zdroj}")
print(f"Cíl: {CIL}")
print(f"Nalezeno souborů: {len(soubory)}"
f"{' [DRY-RUN]' if dry else ''}")
print("-" * 60)
if not dry:
CIL.mkdir(parents=True, exist_ok=True)
stat = Counter()
roky = Counter()
chyby: list[str] = []
for src in soubory:
try:
d = datum_odberu(src)
except ET.ParseError as e:
d = None
cilovy_dir = CIL / "_CHYBY"
chyby.append(f"{src.name}: parse error {e}")
stat["chyba"] += 1
else:
if d is None:
cilovy_dir = CIL / "_BEZ_DATUMU"
stat["bez_datumu"] += 1
else:
rok, mes, den = d
cilovy_dir = CIL / rok / mes / den
stat["ok"] += 1
roky[rok] += 1
dst = cilovy_dir / src.name
if dry:
continue
cilovy_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
print("Hotovo:")
print(f" zařazeno dle data odběru : {stat['ok']}")
print(f" bez data (_BEZ_DATUMU) : {stat['bez_datumu']}")
print(f" chyby parsování (_CHYBY) : {stat['chyba']}")
if roky:
print("\nRozložení podle roku:")
for rok in sorted(roky):
print(f" {rok}: {roky[rok]}")
if chyby:
print("\nDetail chyb:")
for c in chyby:
print(f" {c}")
if __name__ == "__main__":
main()
+8 -1
View File
@@ -11,4 +11,11 @@ TELEGRAM_CHAT_ID=6639316354
# api_id/api_hash z https://my.telegram.org (přihlas se číslem nového účtu)
TELEGRAM_API_ID=39599696
TELEGRAM_API_HASH=f93ed362cdbfb4f5df85072a0350a8fc
TELEGRAM_PHONE=+420705920457
TELEGRAM_PHONE=+420705920457
# PostgreSQL (ordinace) — pro DASTA loader
PG_HOST=192.168.1.76
PG_PORT=5432
PG_USER=vladimir.buzalka
PG_PASSWORD=Vlado7309208104++
PG_DB=ordinace