This commit is contained in:
michaela.buzalkova
2026-04-25 12:55:21 +02:00
parent 718d27aad5
commit 0bfa9c48e4
2 changed files with 545 additions and 1 deletions
@@ -0,0 +1,542 @@
"""
Denní kombinovaný skript: stažení XML z eRecept + parsování do MySQL + emailový souhrn.
Pořadí:
1. Načte terminální sadu z MySQL
2. Načte ERP kódy z Firebirdu
3. Stáhne XML pro nezpracované recepty (API eRecept)
4. Naparsuje dnešní XML do MySQL
5. Odešle emailový souhrn (i v případě výjimky)
Spuštění:
python 12_DenníStaženíAZpracování.py
python 12_DenníStaženíAZpracování.py --od 2025-01-01
python 12_DenníStaženíAZpracování.py --limit 50
"""
import argparse
import random
import sys
import time
import traceback
import uuid
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import date, datetime, timezone
from pathlib import Path
import pymysql
import pymysql.cursors
from requests import Session
from requests_pkcs12 import Pkcs12Adapter
from Knihovny.EmailMessagingGraph import send_mail
from Knihovny.medicus_db import get_medicus_connection
from Knihovny.mysql_db import connect_mysql
from Knihovny.najdi_dropbox import get_dropbox_root
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(errors="replace")
# ── Konfigurace eRecept ───────────────────────────────────────────────────────
PFX_FILE = Path(__file__).parent.parent / "AMBSUKL214235369G_31DEC2024.pfx"
PFX_PASS = "Vlado7309208104++"
API_USER = "e08c89c6-2b1a-4eba-8ed9-4e3e63618379"
API_PASS = "Buzalka@Vladimir2025"
UZIVATEL = "E08C89C6-2B1A-4EBA-8ED9-4E3E63618379"
PRACOVISTE = "00214235367"
ENDPOINT = "https://lekar-soap.erecept.sukl.cz/cuer/Lekar"
NAMESPACE = "http://www.sukl.cz/erp/201704"
PAUZA_MIN = 1
PAUZA_MAX = 3
# ── Adresáře a email ──────────────────────────────────────────────────────────
XML_DIR = Path(get_dropbox_root()) / "Ordinace" / "Dokumentace_ke_zpracování" / "Zúčtovací zprávy" / "NačteníPředpisuWithClaude" / "xml_archive"
EMAIL_PRIJEMCE = "vladimir.buzalka@buzalka.cz"
# ── Výchozí parametry ─────────────────────────────────────────────────────────
DATUM_OD_DEFAULT = "2025-01-01"
# ── Datové třídy pro souhrn ───────────────────────────────────────────────────
@dataclass
class StazeniRadek:
erp_kod: str
pacient: str
stav: str # "OK" | "CHYBA" | "PRESKOCENO"
detail: str = "" # stav receptu nebo chybová zpráva
@dataclass
class ParseRadek:
erp_kod: str
stav_receptu: str # PREDEPSANY / PLNE_VYDANY / …
terminal: int
plp: int
stav: str # "OK" | "CHYBA"
detail: str = ""
@dataclass
class Souhrn:
datum: str = ""
stazeni: list[StazeniRadek] = field(default_factory=list)
parse: list[ParseRadek] = field(default_factory=list)
kriticka_chyba: str = ""
# ─────────────────────────────────────────────────────────────────────────────
# FÁZE 1 — stahování
# ─────────────────────────────────────────────────────────────────────────────
def nacti_terminal_set(mysql_conn):
with mysql_conn.cursor() as cur:
cur.execute("SELECT id_dokladu FROM recept_doklad WHERE stav_terminal = 1")
return {row["id_dokladu"] for row in cur.fetchall()}
def nacti_erp_kody(fb_conn, datum_od, limit=None):
if limit:
sql = f"SELECT FIRST {int(limit)}"
else:
sql = "SELECT"
sql += """
r.datum, r.lek, r.dop, r.idpac,
TRIM(kar.prijmeni) AS prijmeni, TRIM(kar.jmeno) AS jmeno,
ep.erp
FROM recept r
JOIN recept_epodani ep ON r.id_epodani = ep.id
JOIN kar ON r.idpac = kar.idpac
WHERE r.datum >= ? AND ep.erp IS NOT NULL AND r.STORNO = 'F'
ORDER BY r.datum DESC
"""
cur = fb_conn.cursor()
cur.execute(sql, [datum_od])
rows = cur.fetchall()
cur.close()
seen, unique = set(), []
for row in rows:
erp = row[6]
if erp not in seen:
seen.add(erp)
unique.append(row)
return unique
def volej_nacist_predpis(sess, erp_kod):
id_zpravy = str(uuid.uuid4())
odeslano = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
soap_body = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">'
'<soapenv:Body>'
f'<NacteniPredpisuDotaz xmlns="{NAMESPACE}">'
f'<Doklad>'
f'<Pristupujici><Uzivatel>{UZIVATEL}</Uzivatel><Pracoviste>{PRACOVISTE}</Pracoviste></Pristupujici>'
f'<Identifikator><ID_Dokladu>{erp_kod}</ID_Dokladu></Identifikator>'
f'</Doklad>'
f'<Zprava>'
f'<ID_Zpravy>{id_zpravy}</ID_Zpravy>'
f'<Verze>202501A</Verze>'
f'<Odeslano>{odeslano}</Odeslano>'
f'<SW_Klienta>MEDICUS_____</SW_Klienta>'
f'</Zprava>'
f'</NacteniPredpisuDotaz>'
'</soapenv:Body>'
'</soapenv:Envelope>'
)
headers = {
"Content-Type": 'text/xml; charset="UTF-8"',
"SOAPAction": '"NacistPredpis"',
"User-Agent": "Medicus",
}
resp = sess.post(ENDPOINT, data=soap_body.encode("utf-8"), headers=headers, timeout=15)
return resp.status_code, resp.text
def faze_stazeni(datum_od, limit, out_dir, souhrn: Souhrn):
print("Připojuji MySQL...")
mysql = connect_mysql(database="medicus", cursorclass=pymysql.cursors.DictCursor)
terminal = nacti_terminal_set(mysql)
mysql.close()
print(f" Terminálních receptů v DB: {len(terminal)}\n")
print("Připojuji Firebird...")
fb = get_medicus_connection()
rows = nacti_erp_kody(fb, datum_od, limit)
fb.close()
print(f" Unikátních ERP kódů v Medicusu (od {datum_od}): {len(rows)}\n")
ke_stazeni = [r for r in rows if r[6] not in terminal]
preskoceno = len(rows) - len(ke_stazeni)
print(f" Přeskočeno (terminální): {preskoceno}")
print(f" Ke stažení: {len(ke_stazeni)}\n")
for erp in terminal:
souhrn.stazeni.append(StazeniRadek(erp_kod=erp, pacient="", stav="PRESKOCENO", detail="terminální"))
if not ke_stazeni:
print("Vše terminální, nic ke stažení.")
return
sess = Session()
sess.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_FILE, pkcs12_password=PFX_PASS))
sess.auth = (API_USER, API_PASS)
for i, row in enumerate(ke_stazeni, 1):
datum_rec, lek, dop, idpac, prijmeni, jmeno, erp_kod = row
lek_str = f"{lek} {dop}".strip() if dop else str(lek or "").strip()
label = f"{prijmeni} {jmeno}".strip()
print(f"[{i:4d}/{len(ke_stazeni)}] {label:30s} {erp_kod} ", end="", flush=True)
try:
status, text = volej_nacist_predpis(sess, erp_kod)
je_chyba = status != 200 or "<soap:Fault" in text or "Fault>" in text
if not je_chyba:
xml_file = out_dir / f"{erp_kod}.xml"
xml_file.write_text(text, encoding="utf-8")
kb = len(text.encode()) / 1024
print(f"OK {kb:5.1f} KB {lek_str[:40]}")
souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="OK", detail=lek_str[:60]))
else:
chyba_short = text[:120].replace("\n", " ")
print(f"CHYBA HTTP {status} {chyba_short}")
xml_file = out_dir / f"{erp_kod}_CHYBA.xml"
xml_file.write_text(text, encoding="utf-8")
souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="CHYBA", detail=chyba_short[:120]))
except Exception as e:
print(f"EXCEPTION {e}")
souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="CHYBA", detail=str(e)[:120]))
if i < len(ke_stazeni):
time.sleep(random.uniform(PAUZA_MIN, PAUZA_MAX))
# ─────────────────────────────────────────────────────────────────────────────
# FÁZE 2 — parsování
# ─────────────────────────────────────────────────────────────────────────────
NS_PARSE = f"{{{NAMESPACE}}}"
def _t(el, tag):
found = el.find(f"{NS_PARSE}{tag}")
return found.text.strip() if found is not None and found.text else None
def _ts(s):
return s[:19].replace("T", " ") if s else None
def _bool_el(el, tag):
return 1 if _t(el, tag) == "true" else 0
def parsuj_xml(xml_text, xml_soubor):
try:
root = ET.fromstring(xml_text)
doklad = root.find(f".//{NS_PARSE}Doklad")
if doklad is None:
return None
except ET.ParseError:
return None
id_dokladu = _t(doklad, "ID_Dokladu")
stav = _t(doklad, "Stav")
datum_vystaveni = _t(doklad, "DatumVystaveni")
platnost_do_s = _t(doklad, "PlatnostDo")
vypis_do = _t(doklad, "VypisDo")
akutni = _bool_el(doklad, "Akutni")
rodina = _bool_el(doklad, "Rodina")
opakovani_s = _t(doklad, "Opakovani")
druh_pojisteni = _t(doklad, "DruhPojisteni")
modry_pruh = _bool_el(doklad, "ModryPruh")
pozn = _t(doklad, "Pozn")
zap_s = _t(doklad, "ZapocitatelnyDoplatekZbyvaDoLimitu")
zmena = _ts(_t(doklad, "Zmena"))
zalozeni = _ts(_t(doklad, "Zalozeni"))
platnost_date = date.fromisoformat(platnost_do_s) if platnost_do_s else None
expiroval = platnost_date is not None and platnost_date < date.today()
stav_terminal = 1 if stav in ("PLNE_VYDANY", "ZRUSENY") or expiroval else 0
pac = doklad.find(f"{NS_PARSE}Pacient")
zp_el = pac.find(f"{NS_PARSE}ZP") if pac is not None else None
cp = _t(pac, "CP") if pac is not None else None
zp_kod = _t(zp_el, "Kod") if zp_el is not None else None
zp_nazev = _t(zp_el, "Nazev") if zp_el is not None else None
pac_telefon = _t(pac, "Telefon") if pac is not None else None
pac_notif = _t(pac, "Notifikace") if pac is not None else None
pac_pohlavi = _t(pac, "Pohlavi") if pac is not None else None
pred = doklad.find(f"{NS_PARSE}Predepisujici")
lekar_el = pred.find(f"{NS_PARSE}Lekar") if pred is not None else None
odb_el = pred.find(f"{NS_PARSE}Odbornost") if pred is not None else None
lekar_kod_raw = _t(lekar_el, "Kod") if lekar_el is not None else None
lekar_kod = lekar_kod_raw if lekar_kod_raw and lekar_kod_raw.lower() != "skryto" else None
odb_kod = _t(odb_el, "Kod") if odb_el is not None else None
odb_nazev = _t(odb_el, "Nazev") if odb_el is not None else None
lekar_email = _t(pred, "Email") if pred is not None else None
doklad_dict = dict(
id_dokladu=id_dokladu, stav=stav, stav_terminal=stav_terminal,
datum_vystaveni=datum_vystaveni, platnost_do=platnost_do_s, vypis_do=vypis_do,
akutni=akutni, rodina=rodina, opakovani=int(opakovani_s) if opakovani_s else None,
druh_pojisteni=druh_pojisteni, modry_pruh=modry_pruh, pozn=pozn,
zap_doplatek=float(zap_s) if zap_s else None, zmena=zmena, zalozeni=zalozeni,
lekar_kod=lekar_kod, odbornost_kod=odb_kod, odbornost_nazev=odb_nazev,
lekar_email=lekar_email, cp=cp, zp_kod=zp_kod, zp_nazev=zp_nazev,
pac_telefon=pac_telefon, pac_notifikace=pac_notif, pac_pohlavi=pac_pohlavi,
xml_soubor=str(xml_soubor),
)
plp_list = []
for plp_el in doklad.findall(f"{NS_PARSE}PLP"):
id_lp = _t(plp_el, "ID_LP")
uhrada = _t(plp_el, "Uhrada")
prekroceni = 1 if _t(plp_el, "Prekroceni") == "true" else 0
if id_lp:
plp_list.append(dict(id_lp=id_lp, id_dokladu=id_dokladu, uhrada=uhrada, prekroceni=prekroceni))
return doklad_dict, plp_list
def uloz(conn, doklad, plp_list):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO recept_doklad
(id_dokladu, stav, stav_terminal, datum_vystaveni, platnost_do,
vypis_do, akutni, rodina, opakovani, druh_pojisteni, modry_pruh,
pozn, zap_doplatek, zmena, zalozeni,
lekar_kod, odbornost_kod, odbornost_nazev, lekar_email,
cp, zp_kod, zp_nazev, pac_telefon, pac_notifikace, pac_pohlavi,
xml_soubor, stazeno)
VALUES
(%(id_dokladu)s, %(stav)s, %(stav_terminal)s, %(datum_vystaveni)s, %(platnost_do)s,
%(vypis_do)s, %(akutni)s, %(rodina)s, %(opakovani)s, %(druh_pojisteni)s, %(modry_pruh)s,
%(pozn)s, %(zap_doplatek)s, %(zmena)s, %(zalozeni)s,
%(lekar_kod)s, %(odbornost_kod)s, %(odbornost_nazev)s, %(lekar_email)s,
%(cp)s, %(zp_kod)s, %(zp_nazev)s, %(pac_telefon)s, %(pac_notifikace)s, %(pac_pohlavi)s,
%(xml_soubor)s, NOW())
ON DUPLICATE KEY UPDATE
stav=VALUES(stav), stav_terminal=VALUES(stav_terminal),
platnost_do=VALUES(platnost_do), druh_pojisteni=VALUES(druh_pojisteni),
pozn=VALUES(pozn), zap_doplatek=VALUES(zap_doplatek),
zmena=VALUES(zmena), zp_kod=VALUES(zp_kod), zp_nazev=VALUES(zp_nazev),
pac_telefon=VALUES(pac_telefon), pac_notifikace=VALUES(pac_notifikace),
xml_soubor=VALUES(xml_soubor), stazeno=NOW()
""", doklad)
for plp in plp_list:
cur.execute("""
INSERT IGNORE INTO recept_plp (id_lp, id_dokladu, uhrada, prekroceni)
VALUES (%(id_lp)s, %(id_dokladu)s, %(uhrada)s, %(prekroceni)s)
""", plp)
conn.commit()
def faze_parsovani(dnes_str, out_dir, souhrn: Souhrn):
xml_soubory = {
xml_file.stem: xml_file
for xml_file in out_dir.glob("*.xml")
if not xml_file.stem.endswith("_CHYBA")
}
print(f"\nParsování: {len(xml_soubory)} XML souborů z {dnes_str}\n")
if not xml_soubory:
print(" Žádné XML ke zpracování.")
return
conn = connect_mysql(database="medicus", cursorclass=pymysql.cursors.DictCursor)
with conn.cursor() as cur:
cur.execute("ALTER TABLE recept_plp MODIFY uhrada VARCHAR(20)")
cur.execute("ALTER TABLE recept_doklad MODIFY pac_pohlavi VARCHAR(5)")
try:
cur.execute("ALTER TABLE recept_doklad DROP FOREIGN KEY recept_doklad_ibfk_1")
except Exception:
pass
conn.commit()
with conn.cursor() as cur:
cur.execute("SELECT id_dokladu, xml_soubor FROM recept_doklad WHERE xml_soubor IS NOT NULL")
zpracovane = {row["id_dokladu"]: row["xml_soubor"] for row in cur.fetchall()}
for i, (erp_kod, xml_file) in enumerate(xml_soubory.items(), 1):
rel_path = str(xml_file.relative_to(XML_DIR))
print(f"[{i:4d}/{len(xml_soubory)}] {erp_kod} ", end="", flush=True)
if zpracovane.get(erp_kod) == rel_path:
print("přeskočeno (beze změny)")
continue
xml_text = xml_file.read_text(encoding="utf-8")
vysledek = parsuj_xml(xml_text, xml_file.relative_to(XML_DIR))
if vysledek is None:
print("CHYBA parsování")
souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu="", terminal=0, plp=0, stav="CHYBA", detail="chyba parsování XML"))
continue
doklad, plp_list = vysledek
try:
uloz(conn, doklad, plp_list)
stav_r = doklad["stav"] or ""
term = doklad["stav_terminal"]
print(f"{stav_r:20s} terminal={term} PLP={len(plp_list)}")
souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu=stav_r, terminal=term, plp=len(plp_list), stav="OK"))
except Exception as e:
print(f"CHYBA DB {e}")
conn.rollback()
souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu="", terminal=0, plp=0, stav="CHYBA", detail=str(e)[:200]))
conn.close()
# ─────────────────────────────────────────────────────────────────────────────
# EMAIL
# ─────────────────────────────────────────────────────────────────────────────
def _barva(stav):
return {"OK": "#1a7a2a", "CHYBA": "#c0392b", "PRESKOCENO": "#888"}.get(stav, "#333")
def _badge(stav):
return f'<span style="background:{_barva(stav)};color:#fff;padding:1px 6px;border-radius:3px;font-size:0.85em">{stav}</span>'
def sestav_email(souhrn: Souhrn) -> tuple[str, str]:
stazeno_ok = [r for r in souhrn.stazeni if r.stav == "OK"]
stazeno_chyby = [r for r in souhrn.stazeni if r.stav == "CHYBA"]
parse_ok = [r for r in souhrn.parse if r.stav == "OK"]
parse_chyby = [r for r in souhrn.parse if r.stav == "CHYBA"]
ma_chybu = bool(souhrn.kriticka_chyba or stazeno_chyby or parse_chyby)
predmet = f"eRecept {souhrn.datum}{'⚠ CHYBA' if ma_chybu else 'OK'} ({len(stazeno_ok)} staženo, {len(parse_ok)} zpracováno)"
css = "font-family:Arial,sans-serif;font-size:14px;color:#222"
h2 = "margin:20px 0 4px;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px"
td = "padding:3px 10px;border-bottom:1px solid #eee"
th = "padding:4px 10px;background:#f0f0f0;text-align:left;border-bottom:1px solid #ccc"
def tabulka_stazeni(radky, nadpis):
if not radky:
return ""
rows = "".join(
f"<tr><td style='{td}'>{r.erp_kod}</td><td style='{td}'>{r.pacient}</td>"
f"<td style='{td}'>{_badge(r.stav)}</td><td style='{td}'>{r.detail}</td></tr>"
for r in radky
)
return (
f"<h2 style='{h2}'>{nadpis}</h2>"
f"<table style='border-collapse:collapse;width:100%'>"
f"<tr><th style='{th}'>ERP kód</th><th style='{th}'>Pacient</th>"
f"<th style='{th}'>Stav</th><th style='{th}'>Detail</th></tr>"
f"{rows}</table>"
)
def tabulka_parse(radky, nadpis):
if not radky:
return ""
rows = "".join(
f"<tr><td style='{td}'>{r.erp_kod}</td><td style='{td}'>{r.stav_receptu}</td>"
f"<td style='{td}'>{'ano' if r.terminal else 'ne'}</td>"
f"<td style='{td}'>{r.plp}</td>"
f"<td style='{td}'>{_badge(r.stav)}</td><td style='{td}'>{r.detail}</td></tr>"
for r in radky
)
return (
f"<h2 style='{h2}'>{nadpis}</h2>"
f"<table style='border-collapse:collapse;width:100%'>"
f"<tr><th style='{th}'>ERP kód</th><th style='{th}'>Stav receptu</th>"
f"<th style='{th}'>Terminální</th><th style='{th}'>PLP</th>"
f"<th style='{th}'>Stav</th><th style='{th}'>Detail</th></tr>"
f"{rows}</table>"
)
krit = ""
if souhrn.kriticka_chyba:
krit = (
f"<div style='background:#fdecea;border:1px solid #f5c6cb;padding:12px;margin:12px 0;border-radius:4px'>"
f"<strong>Kritická chyba:</strong><pre style='white-space:pre-wrap;margin:6px 0'>{souhrn.kriticka_chyba}</pre></div>"
)
body = (
f"<div style='{css}'>"
f"<h1 style='font-size:18px;margin-bottom:6px'>eRecept — denní souhrn {souhrn.datum}</h1>"
f"{krit}"
f"<p><strong>Stažení:</strong> {len(stazeno_ok)} OK &nbsp;|&nbsp; {len(stazeno_chyby)} chyb</p>"
f"<p><strong>Parsování:</strong> {len(parse_ok)} zpracováno &nbsp;|&nbsp; {len(parse_chyby)} chyb</p>"
+ tabulka_stazeni(stazeno_ok, "Stažené recepty")
+ tabulka_stazeni(stazeno_chyby, "Chyby při stahování")
+ tabulka_parse(parse_ok, "Zpracované recepty")
+ tabulka_parse(parse_chyby, "Chyby při parsování")
+ "</div>"
)
return predmet, body
# ─────────────────────────────────────────────────────────────────────────────
# MAIN
# ─────────────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--od", default=DATUM_OD_DEFAULT, help="Recepty od data (YYYY-MM-DD)")
parser.add_argument("--limit", type=int, default=None, help="Max počet receptů ke stažení")
args = parser.parse_args()
dnes = date.today().isoformat()
out_dir = XML_DIR / dnes
out_dir.mkdir(parents=True, exist_ok=True)
souhrn = Souhrn(datum=dnes)
try:
print("=" * 60)
print(f" FÁZE 1: Stahování XML ({dnes})")
print("=" * 60)
faze_stazeni(args.od, args.limit, out_dir, souhrn)
print("\n" + "=" * 60)
print(f" FÁZE 2: Parsování XML ({dnes})")
print("=" * 60)
faze_parsovani(dnes, out_dir, souhrn)
except Exception:
souhrn.kriticka_chyba = traceback.format_exc()
print(f"\nKRITICKÁ CHYBA:\n{souhrn.kriticka_chyba}", file=sys.stderr)
# ── Souhrn do terminálu ───────────────────────────────────────────────────
stazeno_ok = sum(1 for r in souhrn.stazeni if r.stav == "OK")
stazeno_chyby = sum(1 for r in souhrn.stazeni if r.stav == "CHYBA")
parse_ok = sum(1 for r in souhrn.parse if r.stav == "OK")
parse_chyby = sum(1 for r in souhrn.parse if r.stav == "CHYBA")
print(f"\n{'='*60}")
print(f" SOUHRN {dnes}")
print(f" Stažení: {stazeno_ok} OK, {stazeno_chyby} chyb")
print(f" Parsování: {parse_ok} zpracováno, {parse_chyby} chyb")
if souhrn.kriticka_chyba:
print(" !! Kritická chyba — viz email")
print(f"{'='*60}\n")
# ── Email ─────────────────────────────────────────────────────────────────
try:
predmet, body = sestav_email(souhrn)
send_mail(to=EMAIL_PRIJEMCE, subject=predmet, body=body, html=True)
print(f"Email odeslán: {EMAIL_PRIJEMCE}")
except Exception as e:
print(f"CHYBA odeslání emailu: {e}", file=sys.stderr)
if __name__ == "__main__":
main()