From 0bfa9c48e45e18d3575c47a161da1fe48a21ec9c Mon Sep 17 00:00:00 2001 From: "michaela.buzalkova" Date: Sat, 25 Apr 2026 12:55:21 +0200 Subject: [PATCH] lenovo --- Knihovny/mysql_db.py | 4 +- .../12_DenníStaženíAZpracování.py | 542 ++++++++++++++++++ 2 files changed, 545 insertions(+), 1 deletion(-) create mode 100644 Recepty/NačteníPředpisuWithClaude/12_DenníStaženíAZpracování.py diff --git a/Knihovny/mysql_db.py b/Knihovny/mysql_db.py index d89e6f3..481e5e2 100644 --- a/Knihovny/mysql_db.py +++ b/Knihovny/mysql_db.py @@ -11,7 +11,7 @@ _LOCAL_HOSTS = {"lekar", "sestra", "lenovo"} def connect_mysql(user="root", password="Vlado9674+", database="medevio", - port=3306, charset="utf8mb4", autocommit=True): + port=3306, charset="utf8mb4", autocommit=True, cursorclass=None): """ Připojí se k MySQL. Na lokálních stanicích (lekar/sestra/lenovo) použije 127.0.0.1 přímo, jinak zkusí 192.168.1.76 a pak 127.0.0.1 jako fallback. @@ -22,6 +22,8 @@ def connect_mysql(user="root", password="Vlado9674+", database="medevio", params = dict(port=port, user=user, password=password, database=database, charset=charset, autocommit=autocommit) + if cursorclass is not None: + params["cursorclass"] = cursorclass last_error = None for host in candidates: diff --git a/Recepty/NačteníPředpisuWithClaude/12_DenníStaženíAZpracování.py b/Recepty/NačteníPředpisuWithClaude/12_DenníStaženíAZpracování.py new file mode 100644 index 0000000..9bf5fe7 --- /dev/null +++ b/Recepty/NačteníPředpisuWithClaude/12_DenníStaženíAZpracování.py @@ -0,0 +1,542 @@ +""" +Denní kombinovaný skript: stažení XML z eRecept + parsování do MySQL + emailový souhrn. + +Pořadí: + 1. Načte terminální sadu z MySQL + 2. Načte ERP kódy z Firebirdu + 3. Stáhne XML pro nezpracované recepty (API eRecept) + 4. Naparsuje dnešní XML do MySQL + 5. Odešle emailový souhrn (i v případě výjimky) + +Spuštění: + python 12_DenníStaženíAZpracování.py + python 12_DenníStaženíAZpracování.py --od 2025-01-01 + python 12_DenníStaženíAZpracování.py --limit 50 +""" + +import argparse +import random +import sys +import time +import traceback +import uuid +import xml.etree.ElementTree as ET +from dataclasses import dataclass, field +from datetime import date, datetime, timezone +from pathlib import Path + +import pymysql +import pymysql.cursors +from requests import Session +from requests_pkcs12 import Pkcs12Adapter + +from Knihovny.EmailMessagingGraph import send_mail +from Knihovny.medicus_db import get_medicus_connection +from Knihovny.mysql_db import connect_mysql +from Knihovny.najdi_dropbox import get_dropbox_root + +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(errors="replace") + +# ── Konfigurace eRecept ─────────────────────────────────────────────────────── +PFX_FILE = Path(__file__).parent.parent / "AMBSUKL214235369G_31DEC2024.pfx" +PFX_PASS = "Vlado7309208104++" +API_USER = "e08c89c6-2b1a-4eba-8ed9-4e3e63618379" +API_PASS = "Buzalka@Vladimir2025" +UZIVATEL = "E08C89C6-2B1A-4EBA-8ED9-4E3E63618379" +PRACOVISTE = "00214235367" +ENDPOINT = "https://lekar-soap.erecept.sukl.cz/cuer/Lekar" +NAMESPACE = "http://www.sukl.cz/erp/201704" + +PAUZA_MIN = 1 +PAUZA_MAX = 3 + +# ── Adresáře a email ────────────────────────────────────────────────────────── +XML_DIR = Path(get_dropbox_root()) / "Ordinace" / "Dokumentace_ke_zpracování" / "Zúčtovací zprávy" / "NačteníPředpisuWithClaude" / "xml_archive" +EMAIL_PRIJEMCE = "vladimir.buzalka@buzalka.cz" + +# ── Výchozí parametry ───────────────────────────────────────────────────────── +DATUM_OD_DEFAULT = "2025-01-01" + + +# ── Datové třídy pro souhrn ─────────────────────────────────────────────────── + +@dataclass +class StazeniRadek: + erp_kod: str + pacient: str + stav: str # "OK" | "CHYBA" | "PRESKOCENO" + detail: str = "" # stav receptu nebo chybová zpráva + + +@dataclass +class ParseRadek: + erp_kod: str + stav_receptu: str # PREDEPSANY / PLNE_VYDANY / … + terminal: int + plp: int + stav: str # "OK" | "CHYBA" + detail: str = "" + + +@dataclass +class Souhrn: + datum: str = "" + stazeni: list[StazeniRadek] = field(default_factory=list) + parse: list[ParseRadek] = field(default_factory=list) + kriticka_chyba: str = "" + + +# ───────────────────────────────────────────────────────────────────────────── +# FÁZE 1 — stahování +# ───────────────────────────────────────────────────────────────────────────── + +def nacti_terminal_set(mysql_conn): + with mysql_conn.cursor() as cur: + cur.execute("SELECT id_dokladu FROM recept_doklad WHERE stav_terminal = 1") + return {row["id_dokladu"] for row in cur.fetchall()} + + +def nacti_erp_kody(fb_conn, datum_od, limit=None): + if limit: + sql = f"SELECT FIRST {int(limit)}" + else: + sql = "SELECT" + sql += """ + r.datum, r.lek, r.dop, r.idpac, + TRIM(kar.prijmeni) AS prijmeni, TRIM(kar.jmeno) AS jmeno, + ep.erp + FROM recept r + JOIN recept_epodani ep ON r.id_epodani = ep.id + JOIN kar ON r.idpac = kar.idpac + WHERE r.datum >= ? AND ep.erp IS NOT NULL AND r.STORNO = 'F' + ORDER BY r.datum DESC + """ + cur = fb_conn.cursor() + cur.execute(sql, [datum_od]) + rows = cur.fetchall() + cur.close() + + seen, unique = set(), [] + for row in rows: + erp = row[6] + if erp not in seen: + seen.add(erp) + unique.append(row) + return unique + + +def volej_nacist_predpis(sess, erp_kod): + id_zpravy = str(uuid.uuid4()) + odeslano = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") + soap_body = ( + '' + '' + '' + f'' + f'' + f'{UZIVATEL}{PRACOVISTE}' + f'{erp_kod}' + f'' + f'' + f'{id_zpravy}' + f'202501A' + f'{odeslano}' + f'MEDICUS_____' + f'' + f'' + '' + '' + ) + headers = { + "Content-Type": 'text/xml; charset="UTF-8"', + "SOAPAction": '"NacistPredpis"', + "User-Agent": "Medicus", + } + resp = sess.post(ENDPOINT, data=soap_body.encode("utf-8"), headers=headers, timeout=15) + return resp.status_code, resp.text + + +def faze_stazeni(datum_od, limit, out_dir, souhrn: Souhrn): + print("Připojuji MySQL...") + mysql = connect_mysql(database="medicus", cursorclass=pymysql.cursors.DictCursor) + terminal = nacti_terminal_set(mysql) + mysql.close() + print(f" Terminálních receptů v DB: {len(terminal)}\n") + + print("Připojuji Firebird...") + fb = get_medicus_connection() + rows = nacti_erp_kody(fb, datum_od, limit) + fb.close() + print(f" Unikátních ERP kódů v Medicusu (od {datum_od}): {len(rows)}\n") + + ke_stazeni = [r for r in rows if r[6] not in terminal] + preskoceno = len(rows) - len(ke_stazeni) + print(f" Přeskočeno (terminální): {preskoceno}") + print(f" Ke stažení: {len(ke_stazeni)}\n") + + for erp in terminal: + souhrn.stazeni.append(StazeniRadek(erp_kod=erp, pacient="", stav="PRESKOCENO", detail="terminální")) + + if not ke_stazeni: + print("Vše terminální, nic ke stažení.") + return + + sess = Session() + sess.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_FILE, pkcs12_password=PFX_PASS)) + sess.auth = (API_USER, API_PASS) + + for i, row in enumerate(ke_stazeni, 1): + datum_rec, lek, dop, idpac, prijmeni, jmeno, erp_kod = row + lek_str = f"{lek} {dop}".strip() if dop else str(lek or "").strip() + label = f"{prijmeni} {jmeno}".strip() + + print(f"[{i:4d}/{len(ke_stazeni)}] {label:30s} {erp_kod} ", end="", flush=True) + + try: + status, text = volej_nacist_predpis(sess, erp_kod) + je_chyba = status != 200 or "" in text + + if not je_chyba: + xml_file = out_dir / f"{erp_kod}.xml" + xml_file.write_text(text, encoding="utf-8") + kb = len(text.encode()) / 1024 + print(f"OK {kb:5.1f} KB {lek_str[:40]}") + souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="OK", detail=lek_str[:60])) + else: + chyba_short = text[:120].replace("\n", " ") + print(f"CHYBA HTTP {status} {chyba_short}") + xml_file = out_dir / f"{erp_kod}_CHYBA.xml" + xml_file.write_text(text, encoding="utf-8") + souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="CHYBA", detail=chyba_short[:120])) + + except Exception as e: + print(f"EXCEPTION {e}") + souhrn.stazeni.append(StazeniRadek(erp_kod=erp_kod, pacient=label, stav="CHYBA", detail=str(e)[:120])) + + if i < len(ke_stazeni): + time.sleep(random.uniform(PAUZA_MIN, PAUZA_MAX)) + + +# ───────────────────────────────────────────────────────────────────────────── +# FÁZE 2 — parsování +# ───────────────────────────────────────────────────────────────────────────── + +NS_PARSE = f"{{{NAMESPACE}}}" + + +def _t(el, tag): + found = el.find(f"{NS_PARSE}{tag}") + return found.text.strip() if found is not None and found.text else None + + +def _ts(s): + return s[:19].replace("T", " ") if s else None + + +def _bool_el(el, tag): + return 1 if _t(el, tag) == "true" else 0 + + +def parsuj_xml(xml_text, xml_soubor): + try: + root = ET.fromstring(xml_text) + doklad = root.find(f".//{NS_PARSE}Doklad") + if doklad is None: + return None + except ET.ParseError: + return None + + id_dokladu = _t(doklad, "ID_Dokladu") + stav = _t(doklad, "Stav") + datum_vystaveni = _t(doklad, "DatumVystaveni") + platnost_do_s = _t(doklad, "PlatnostDo") + vypis_do = _t(doklad, "VypisDo") + akutni = _bool_el(doklad, "Akutni") + rodina = _bool_el(doklad, "Rodina") + opakovani_s = _t(doklad, "Opakovani") + druh_pojisteni = _t(doklad, "DruhPojisteni") + modry_pruh = _bool_el(doklad, "ModryPruh") + pozn = _t(doklad, "Pozn") + zap_s = _t(doklad, "ZapocitatelnyDoplatekZbyvaDoLimitu") + zmena = _ts(_t(doklad, "Zmena")) + zalozeni = _ts(_t(doklad, "Zalozeni")) + + platnost_date = date.fromisoformat(platnost_do_s) if platnost_do_s else None + expiroval = platnost_date is not None and platnost_date < date.today() + stav_terminal = 1 if stav in ("PLNE_VYDANY", "ZRUSENY") or expiroval else 0 + + pac = doklad.find(f"{NS_PARSE}Pacient") + zp_el = pac.find(f"{NS_PARSE}ZP") if pac is not None else None + cp = _t(pac, "CP") if pac is not None else None + zp_kod = _t(zp_el, "Kod") if zp_el is not None else None + zp_nazev = _t(zp_el, "Nazev") if zp_el is not None else None + pac_telefon = _t(pac, "Telefon") if pac is not None else None + pac_notif = _t(pac, "Notifikace") if pac is not None else None + pac_pohlavi = _t(pac, "Pohlavi") if pac is not None else None + + pred = doklad.find(f"{NS_PARSE}Predepisujici") + lekar_el = pred.find(f"{NS_PARSE}Lekar") if pred is not None else None + odb_el = pred.find(f"{NS_PARSE}Odbornost") if pred is not None else None + lekar_kod_raw = _t(lekar_el, "Kod") if lekar_el is not None else None + lekar_kod = lekar_kod_raw if lekar_kod_raw and lekar_kod_raw.lower() != "skryto" else None + odb_kod = _t(odb_el, "Kod") if odb_el is not None else None + odb_nazev = _t(odb_el, "Nazev") if odb_el is not None else None + lekar_email = _t(pred, "Email") if pred is not None else None + + doklad_dict = dict( + id_dokladu=id_dokladu, stav=stav, stav_terminal=stav_terminal, + datum_vystaveni=datum_vystaveni, platnost_do=platnost_do_s, vypis_do=vypis_do, + akutni=akutni, rodina=rodina, opakovani=int(opakovani_s) if opakovani_s else None, + druh_pojisteni=druh_pojisteni, modry_pruh=modry_pruh, pozn=pozn, + zap_doplatek=float(zap_s) if zap_s else None, zmena=zmena, zalozeni=zalozeni, + lekar_kod=lekar_kod, odbornost_kod=odb_kod, odbornost_nazev=odb_nazev, + lekar_email=lekar_email, cp=cp, zp_kod=zp_kod, zp_nazev=zp_nazev, + pac_telefon=pac_telefon, pac_notifikace=pac_notif, pac_pohlavi=pac_pohlavi, + xml_soubor=str(xml_soubor), + ) + + plp_list = [] + for plp_el in doklad.findall(f"{NS_PARSE}PLP"): + id_lp = _t(plp_el, "ID_LP") + uhrada = _t(plp_el, "Uhrada") + prekroceni = 1 if _t(plp_el, "Prekroceni") == "true" else 0 + if id_lp: + plp_list.append(dict(id_lp=id_lp, id_dokladu=id_dokladu, uhrada=uhrada, prekroceni=prekroceni)) + + return doklad_dict, plp_list + + +def uloz(conn, doklad, plp_list): + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO recept_doklad + (id_dokladu, stav, stav_terminal, datum_vystaveni, platnost_do, + vypis_do, akutni, rodina, opakovani, druh_pojisteni, modry_pruh, + pozn, zap_doplatek, zmena, zalozeni, + lekar_kod, odbornost_kod, odbornost_nazev, lekar_email, + cp, zp_kod, zp_nazev, pac_telefon, pac_notifikace, pac_pohlavi, + xml_soubor, stazeno) + VALUES + (%(id_dokladu)s, %(stav)s, %(stav_terminal)s, %(datum_vystaveni)s, %(platnost_do)s, + %(vypis_do)s, %(akutni)s, %(rodina)s, %(opakovani)s, %(druh_pojisteni)s, %(modry_pruh)s, + %(pozn)s, %(zap_doplatek)s, %(zmena)s, %(zalozeni)s, + %(lekar_kod)s, %(odbornost_kod)s, %(odbornost_nazev)s, %(lekar_email)s, + %(cp)s, %(zp_kod)s, %(zp_nazev)s, %(pac_telefon)s, %(pac_notifikace)s, %(pac_pohlavi)s, + %(xml_soubor)s, NOW()) + ON DUPLICATE KEY UPDATE + stav=VALUES(stav), stav_terminal=VALUES(stav_terminal), + platnost_do=VALUES(platnost_do), druh_pojisteni=VALUES(druh_pojisteni), + pozn=VALUES(pozn), zap_doplatek=VALUES(zap_doplatek), + zmena=VALUES(zmena), zp_kod=VALUES(zp_kod), zp_nazev=VALUES(zp_nazev), + pac_telefon=VALUES(pac_telefon), pac_notifikace=VALUES(pac_notifikace), + xml_soubor=VALUES(xml_soubor), stazeno=NOW() + """, doklad) + + for plp in plp_list: + cur.execute(""" + INSERT IGNORE INTO recept_plp (id_lp, id_dokladu, uhrada, prekroceni) + VALUES (%(id_lp)s, %(id_dokladu)s, %(uhrada)s, %(prekroceni)s) + """, plp) + + conn.commit() + + +def faze_parsovani(dnes_str, out_dir, souhrn: Souhrn): + xml_soubory = { + xml_file.stem: xml_file + for xml_file in out_dir.glob("*.xml") + if not xml_file.stem.endswith("_CHYBA") + } + print(f"\nParsování: {len(xml_soubory)} XML souborů z {dnes_str}\n") + + if not xml_soubory: + print(" Žádné XML ke zpracování.") + return + + conn = connect_mysql(database="medicus", cursorclass=pymysql.cursors.DictCursor) + + with conn.cursor() as cur: + cur.execute("ALTER TABLE recept_plp MODIFY uhrada VARCHAR(20)") + cur.execute("ALTER TABLE recept_doklad MODIFY pac_pohlavi VARCHAR(5)") + try: + cur.execute("ALTER TABLE recept_doklad DROP FOREIGN KEY recept_doklad_ibfk_1") + except Exception: + pass + conn.commit() + + with conn.cursor() as cur: + cur.execute("SELECT id_dokladu, xml_soubor FROM recept_doklad WHERE xml_soubor IS NOT NULL") + zpracovane = {row["id_dokladu"]: row["xml_soubor"] for row in cur.fetchall()} + + for i, (erp_kod, xml_file) in enumerate(xml_soubory.items(), 1): + rel_path = str(xml_file.relative_to(XML_DIR)) + print(f"[{i:4d}/{len(xml_soubory)}] {erp_kod} ", end="", flush=True) + + if zpracovane.get(erp_kod) == rel_path: + print("přeskočeno (beze změny)") + continue + + xml_text = xml_file.read_text(encoding="utf-8") + vysledek = parsuj_xml(xml_text, xml_file.relative_to(XML_DIR)) + + if vysledek is None: + print("CHYBA parsování") + souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu="", terminal=0, plp=0, stav="CHYBA", detail="chyba parsování XML")) + continue + + doklad, plp_list = vysledek + try: + uloz(conn, doklad, plp_list) + stav_r = doklad["stav"] or "" + term = doklad["stav_terminal"] + print(f"{stav_r:20s} terminal={term} PLP={len(plp_list)}") + souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu=stav_r, terminal=term, plp=len(plp_list), stav="OK")) + except Exception as e: + print(f"CHYBA DB {e}") + conn.rollback() + souhrn.parse.append(ParseRadek(erp_kod=erp_kod, stav_receptu="", terminal=0, plp=0, stav="CHYBA", detail=str(e)[:200])) + + conn.close() + + +# ───────────────────────────────────────────────────────────────────────────── +# EMAIL +# ───────────────────────────────────────────────────────────────────────────── + +def _barva(stav): + return {"OK": "#1a7a2a", "CHYBA": "#c0392b", "PRESKOCENO": "#888"}.get(stav, "#333") + + +def _badge(stav): + return f'{stav}' + + +def sestav_email(souhrn: Souhrn) -> tuple[str, str]: + stazeno_ok = [r for r in souhrn.stazeni if r.stav == "OK"] + stazeno_chyby = [r for r in souhrn.stazeni if r.stav == "CHYBA"] + parse_ok = [r for r in souhrn.parse if r.stav == "OK"] + parse_chyby = [r for r in souhrn.parse if r.stav == "CHYBA"] + + ma_chybu = bool(souhrn.kriticka_chyba or stazeno_chyby or parse_chyby) + predmet = f"eRecept {souhrn.datum} — {'⚠ CHYBA' if ma_chybu else 'OK'} ({len(stazeno_ok)} staženo, {len(parse_ok)} zpracováno)" + + css = "font-family:Arial,sans-serif;font-size:14px;color:#222" + h2 = "margin:20px 0 4px;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px" + td = "padding:3px 10px;border-bottom:1px solid #eee" + th = "padding:4px 10px;background:#f0f0f0;text-align:left;border-bottom:1px solid #ccc" + + def tabulka_stazeni(radky, nadpis): + if not radky: + return "" + rows = "".join( + f"{r.erp_kod}{r.pacient}" + f"{_badge(r.stav)}{r.detail}" + for r in radky + ) + return ( + f"

{nadpis}

" + f"" + f"" + f"" + f"{rows}
ERP kódPacientStavDetail
" + ) + + def tabulka_parse(radky, nadpis): + if not radky: + return "" + rows = "".join( + f"{r.erp_kod}{r.stav_receptu}" + f"{'ano' if r.terminal else 'ne'}" + f"{r.plp}" + f"{_badge(r.stav)}{r.detail}" + for r in radky + ) + return ( + f"

{nadpis}

" + f"" + f"" + f"" + f"" + f"{rows}
ERP kódStav receptuTerminálníPLPStavDetail
" + ) + + krit = "" + if souhrn.kriticka_chyba: + krit = ( + f"
" + f"Kritická chyba:
{souhrn.kriticka_chyba}
" + ) + + body = ( + f"
" + f"

eRecept — denní souhrn {souhrn.datum}

" + f"{krit}" + f"

Stažení: {len(stazeno_ok)} OK  |  {len(stazeno_chyby)} chyb

" + f"

Parsování: {len(parse_ok)} zpracováno  |  {len(parse_chyby)} chyb

" + + tabulka_stazeni(stazeno_ok, "Stažené recepty") + + tabulka_stazeni(stazeno_chyby, "Chyby při stahování") + + tabulka_parse(parse_ok, "Zpracované recepty") + + tabulka_parse(parse_chyby, "Chyby při parsování") + + "
" + ) + + return predmet, body + + +# ───────────────────────────────────────────────────────────────────────────── +# MAIN +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--od", default=DATUM_OD_DEFAULT, help="Recepty od data (YYYY-MM-DD)") + parser.add_argument("--limit", type=int, default=None, help="Max počet receptů ke stažení") + args = parser.parse_args() + + dnes = date.today().isoformat() + out_dir = XML_DIR / dnes + out_dir.mkdir(parents=True, exist_ok=True) + + souhrn = Souhrn(datum=dnes) + + try: + print("=" * 60) + print(f" FÁZE 1: Stahování XML ({dnes})") + print("=" * 60) + faze_stazeni(args.od, args.limit, out_dir, souhrn) + + print("\n" + "=" * 60) + print(f" FÁZE 2: Parsování XML ({dnes})") + print("=" * 60) + faze_parsovani(dnes, out_dir, souhrn) + + except Exception: + souhrn.kriticka_chyba = traceback.format_exc() + print(f"\nKRITICKÁ CHYBA:\n{souhrn.kriticka_chyba}", file=sys.stderr) + + # ── Souhrn do terminálu ─────────────────────────────────────────────────── + stazeno_ok = sum(1 for r in souhrn.stazeni if r.stav == "OK") + stazeno_chyby = sum(1 for r in souhrn.stazeni if r.stav == "CHYBA") + parse_ok = sum(1 for r in souhrn.parse if r.stav == "OK") + parse_chyby = sum(1 for r in souhrn.parse if r.stav == "CHYBA") + + print(f"\n{'='*60}") + print(f" SOUHRN {dnes}") + print(f" Stažení: {stazeno_ok} OK, {stazeno_chyby} chyb") + print(f" Parsování: {parse_ok} zpracováno, {parse_chyby} chyb") + if souhrn.kriticka_chyba: + print(" !! Kritická chyba — viz email") + print(f"{'='*60}\n") + + # ── Email ───────────────────────────────────────────────────────────────── + try: + predmet, body = sestav_email(souhrn) + send_mail(to=EMAIL_PRIJEMCE, subject=predmet, body=body, html=True) + print(f"Email odeslán: {EMAIL_PRIJEMCE}") + except Exception as e: + print(f"CHYBA odeslání emailu: {e}", file=sys.stderr) + + +if __name__ == "__main__": + main()