Files
ordinaceprojekt/Medevio/80 Pacienti/pacienti_gui.py
T
Vladimir Buzalka d30404ffd0 Medevio/80 Pacienti: GUI pro správu pacientů mimo Medicus
- pacienti_gui.py: Tkinter aplikace se scrollovatelným seznamem pacientů
  ACTIVE v Medevio, kteří nejsou registrovaní v Medicusu; tlačítka
  VZP dotaz, Bod zlomu (binary search pojištění) a Označ neaktivní
- _test_breakpoint.py: dočasný testovací skript pro binary search
- NOTES.md: technická dokumentace adresáře

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-20 07:05:13 +02:00

671 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
GUI: Pacienti ACTIVE v Medevio, kteří nejsou registrovaní v Medicusu.
Sloupce: Jméno, RC, Pojišťovna, Narozen, Poslední dekurz
Tlačítka: [VZP dotaz] [Označ neaktivní]
Spodní panel: výsledek VZP dotazu (pojišťovna + registrující lékař)
"""
import sys
import threading
import time
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path
from datetime import date, timedelta
sys.path.insert(0, "U:/OrdinaceProjekt")
import pymysql
import requests
from Knihovny.medicus_db import get_medicus_db
from Knihovny.vzpb2b_client import VZPB2BClient
# ── CONFIG ──────────────────────────────────────────────────────────────────
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CLINIC_SLUG = "mudr-buzalkova"
TOKEN_PATH = Path("U:/OrdinaceProjekt/Medevio/token.txt")
CERT_PATH = Path("U:/OrdinaceProjekt/Insurance/Certificates/picka.pfx")
CERT_PASSWORD = "Vlado7309208104+"
ICZ = "09305000"
DELAY_VZP = 1.5 # s mezi VZP voláními — max 2 dotazy / 3 s
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
MUTATION_REMOVE = """
mutation ClinicPatientEditStatusModal_UpdateClinicPatientStatus(
$clinicSlug: String!, $patientId: String!, $status: ClinicPatientStatus!
) {
updated: updateClinicPatientStatus(
clinicSlug: $clinicSlug
patientId: $patientId
status: $status
)
}
"""
# ── HELPERS ──────────────────────────────────────────────────────────────────
def load_token() -> str:
t = TOKEN_PATH.read_text(encoding="utf-8").strip()
return t.removeprefix("Bearer ").strip()
def medevio_headers(token: str) -> dict:
return {
"content-type": "application/json",
"authorization": f"Bearer {token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# ── DATA LOADING ─────────────────────────────────────────────────────────────
def load_patients() -> list[dict]:
"""
Vrátí pacienty ACTIVE v Medevio (MySQL), kteří nejsou registrovaní
v Medicusu (Firebird). Přidá datum posledního záznamu v dekurzu.
"""
# 1) Medicus — registrovaná rodná čísla
db = get_medicus_db()
rows = db.get_active_registered_patients()
medicus_rc: set[str] = {r[0].strip() for r in rows if r[0]}
db.close()
# 2) MySQL — všichni ACTIVE s RC
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute("""
SELECT patient_id, name, surname, identification_number,
insurance_name, insurance_code, user_id, dob
FROM medevio_pacient
WHERE status = 'ACTIVE'
AND identification_number IS NOT NULL
AND identification_number <> ''
AND surname NOT LIKE '%%\\%%%'
ORDER BY surname, name
""")
all_active = cur.fetchall()
conn.close()
# 3) Filtruj mimo Medicus
extras = [p for p in all_active if p["identification_number"] not in medicus_rc]
if not extras:
return extras
# 4) Datum posledního záznamu v dekurzu (LOG tabulka)
rc_list = [p["identification_number"] for p in extras]
# Bezpečné inline — RC jsou vždy jen číslice
rc_sql_in = ",".join(f"'{rc}'" for rc in rc_list)
db2 = get_medicus_db()
try:
# Nejdřív idpac → rodcis mapping
idpac_rows = db2.query(
f"SELECT idpac, rodcis FROM kar WHERE rodcis IN ({rc_sql_in})"
)
rc_to_idpac: dict[str, int] = {}
idpac_set: list[str] = []
for r in idpac_rows:
rc_to_idpac[r[1].strip()] = r[0]
idpac_set.append(str(r[0]))
# Datum posledního záznamu v LOG pro tyto idpac
dekurz_map: dict[int, str] = {}
if idpac_set:
idpac_sql_in = ",".join(idpac_set)
log_rows = db2.query(
f"SELECT idpac, MAX(datum) FROM log "
f"WHERE idpac IN ({idpac_sql_in}) GROUP BY idpac"
)
dekurz_map = {r[0]: str(r[1])[:10] if r[1] else None for r in log_rows}
except Exception:
rc_to_idpac = {}
dekurz_map = {}
finally:
db2.close()
for p in extras:
idpac = rc_to_idpac.get(p["identification_number"])
p["posledni_dekurz"] = dekurz_map.get(idpac) if idpac else None
return extras
def api_set_removed(token: str, patient_id: str) -> tuple[bool, str | None]:
"""Volá Medevio GraphQL API — nastaví status REMOVED."""
resp = requests.post(
GRAPHQL_URL,
headers=medevio_headers(token),
json={
"operationName": "ClinicPatientEditStatusModal_UpdateClinicPatientStatus",
"query": MUTATION_REMOVE,
"variables": {
"clinicSlug": CLINIC_SLUG,
"patientId": patient_id,
"status": "REMOVED",
},
},
timeout=20,
)
resp.raise_for_status()
data = resp.json()
if "errors" in data:
return False, str(data["errors"])
return bool(data.get("data", {}).get("updated")), None
def mysql_set_removed(patient_id: str) -> None:
"""Aktualizuje status v MySQL tabulce."""
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(
"UPDATE medevio_pacient SET status='REMOVED' WHERE patient_id = %s",
(patient_id,),
)
conn.commit()
conn.close()
# ── BINARY SEARCH — BOD ZLOMU POJIŠTĚNÍ ─────────────────────────────────────
def _get_insurance_status(vzp_client: VZPB2BClient, rc: str, k_datu: str) -> dict:
"""Vrátí parsed stav pojištění k danému datu — s retry na 503."""
for attempt in range(3):
try:
xml = vzp_client.stav_pojisteni(rc, k_datu=k_datu)
return vzp_client.parse_stav_pojisteni(xml)
except Exception:
if attempt < 2:
time.sleep(3)
raise RuntimeError(f"API selhalo 3× pro {k_datu}")
def _is_active(status: dict) -> bool:
"""Pojistěnec je 'aktivní' pokud API vrátilo stav '1' (pojištěn)."""
return status.get("stavVyrizeni") == 1 and status.get("stav") == "1"
def find_insurance_breakpoint(
vzp_client: VZPB2BClient,
rc: str,
years_back: int = 2,
progress_cb=None,
) -> dict:
"""
Binary search: hledá datum, kdy pacient přešel ze stavu 'pojištěn'
na jiný stav (jiná pojišťovna / nenalezen).
Vrátí dict s klíči:
found bool — zda byl zlom nalezen
date_last str — poslední datum kdy byl stav 'aktivní'
date_first_x str — první datum kdy byl stav jiný
status_start dict — stav na začátku (dnes years_back)
status_end dict — stav dnes
iterations int — počet API volání
message str — popis výsledku
"""
today = date.today()
start_date = today - timedelta(days=years_back * 365)
iterations = 0
def check(d: date) -> tuple[dict, bool]:
nonlocal iterations
iterations += 1
if progress_cb:
progress_cb(f"Testuju {d.isoformat()} (volání č. {iterations})…")
status = _get_insurance_status(vzp_client, rc, d.isoformat())
time.sleep(1.5)
return status, _is_active(status)
status_start, active_start = check(start_date)
status_end, active_end = check(today)
# Pokud ani na začátku nebyl aktivní — nemáme levý okraj
if not active_start:
return {
"found": False,
"date_last": None,
"date_first_x": None,
"status_start": status_start,
"status_end": status_end,
"iterations": iterations,
"message": (
f"Před {years_back} lety ({start_date.isoformat()}) již nebyl pojištěn "
f"nebo byl u jiné pojišťovny.\n"
f" Stav tehdy: {status_start.get('stav')} / "
f"pojišťovna: {status_start.get('nazevPojistovny')}"
),
}
# Pokud je dnes stále aktivní — zlom nenalezen v daném okně
if active_end:
return {
"found": False,
"date_last": today.isoformat(),
"date_first_x": None,
"status_start": status_start,
"status_end": status_end,
"iterations": iterations,
"message": (
f"Pacient je stále pojištěn (i dnes).\n"
f" Pojišťovna dnes: {status_end.get('nazevPojistovny')} / "
f"stav: {status_end.get('stav')}"
),
}
# Binary search mezi start_date (aktivní) a today (neaktivní)
left = start_date
right = today
left_status = status_start
right_status = status_end
while (right - left).days > 1:
mid = left + timedelta(days=(right - left).days // 2)
mid_status, mid_active = check(mid)
if mid_active:
left = mid
left_status = mid_status
else:
right = mid
right_status = mid_status
return {
"found": True,
"date_last": left.isoformat(),
"date_first_x": right.isoformat(),
"status_start": status_start,
"status_end": status_end,
"iterations": iterations,
"message": (
f"Bod zlomu nalezen:\n"
f" Poslední den jako pojištěnec : {left.isoformat()}\n"
f" Pojišťovna: {left_status.get('nazevPojistovny')} "
f"(kód {left_status.get('kodPojistovny')}) / "
f"stav: {left_status.get('stav')}\n"
f" První den se změnou : {right.isoformat()}\n"
f" Pojišťovna: {right_status.get('nazevPojistovny')} "
f"(kód {right_status.get('kodPojistovny')}) / "
f"stav: {right_status.get('stav')}\n"
f" Celkem API volání: {iterations}"
),
}
# ── GUI ───────────────────────────────────────────────────────────────────────
COL_WIDTHS = {
"name": 28, # chars
"rc": 13,
"poj": 32,
"dob": 10,
"dek": 12,
}
BG_EVEN = "#ffffff"
BG_ODD = "#f4f6f8"
BG_REMOVED = "#d5f5e3"
BG_HEADER = "#2c3e50"
FG_HEADER = "#ffffff"
BG_BTN_VZP = "#2980b9"
BG_BTN_REM = "#c0392b"
class PacientApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("Pacienti Medevio · mimo Medicus")
self.geometry("1320x780")
self.minsize(900, 600)
self.configure(bg="#f0f0f0")
self.token: str = ""
self.vzp_client: VZPB2BClient | None = None
self.patients: list[dict] = []
self._build_ui()
self._start_load()
# ── UI CONSTRUCTION ──────────────────────────────────────────────────────
def _build_ui(self):
self._build_topbar()
self._build_colheaders()
self._build_list_area()
self._build_bottom_panel()
def _build_topbar(self):
bar = tk.Frame(self, bg=BG_HEADER, pady=8)
bar.pack(fill="x")
tk.Label(bar, text="Pacienti ACTIVE v Medevio · mimo Medicus",
bg=BG_HEADER, fg=FG_HEADER,
font=("Segoe UI", 13, "bold")).pack(side="left", padx=14)
self.lbl_status = tk.Label(bar, text="Načítám…",
bg=BG_HEADER, fg="#bdc3c7",
font=("Segoe UI", 10))
self.lbl_status.pack(side="left", padx=8)
tk.Button(bar, text="⟳ Obnovit", command=self._start_load,
bg="#3498db", fg="white", activebackground="#2980b9",
relief="flat", padx=12, pady=2,
cursor="hand2").pack(side="right", padx=14)
def _build_colheaders(self):
hdr = tk.Frame(self, bg="#dfe6e9", pady=5, padx=10)
hdr.pack(fill="x")
cols = [
("Příjmení, Jméno", COL_WIDTHS["name"]),
("Rodné číslo", COL_WIDTHS["rc"]),
("Pojišťovna", COL_WIDTHS["poj"]),
("Narozen", COL_WIDTHS["dob"]),
("Posl. dekurz", COL_WIDTHS["dek"]),
("", 12),
("", 14),
("", 18),
]
for text, w in cols:
tk.Label(hdr, text=text, bg="#dfe6e9",
font=("Segoe UI", 9, "bold"),
width=w, anchor="w").pack(side="left")
def _build_list_area(self):
wrapper = tk.Frame(self, bg="#e0e0e0")
wrapper.pack(fill="both", expand=True, padx=4, pady=(0, 4))
self.canvas = tk.Canvas(wrapper, bg="#ffffff", highlightthickness=0)
vsb = ttk.Scrollbar(wrapper, orient="vertical", command=self.canvas.yview)
self.canvas.configure(yscrollcommand=vsb.set)
vsb.pack(side="right", fill="y")
self.canvas.pack(side="left", fill="both", expand=True)
self.list_frame = tk.Frame(self.canvas, bg="#ffffff")
self._cwin = self.canvas.create_window((0, 0), window=self.list_frame, anchor="nw")
self.list_frame.bind("<Configure>",
lambda e: self.canvas.configure(
scrollregion=self.canvas.bbox("all")))
self.canvas.bind("<Configure>",
lambda e: self.canvas.itemconfig(self._cwin, width=e.width))
self.canvas.bind("<MouseWheel>",
lambda e: self.canvas.yview_scroll(
int(-1 * (e.delta / 120)), "units"))
def _build_bottom_panel(self):
pnl = tk.LabelFrame(self, text=" Výsledek VZP dotazu ",
font=("Segoe UI", 9, "bold"),
bg="#f0f0f0", pady=6, padx=8)
pnl.pack(fill="x", padx=6, pady=(0, 6))
self.vzp_text = tk.Text(pnl, height=13,
font=("Consolas", 9),
bg="#1e1e1e", fg="#d4d4d4",
insertbackground="#d4d4d4",
relief="flat", wrap="word",
state="disabled")
self.vzp_text.pack(fill="x")
self._vzp_write("Vyberte pacienta a stiskněte [VZP dotaz].")
# ── DATA FLOW ────────────────────────────────────────────────────────────
def _start_load(self):
self.lbl_status.config(text="Načítám…")
for w in self.list_frame.winfo_children():
w.destroy()
threading.Thread(target=self._bg_load, daemon=True).start()
def _bg_load(self):
try:
self.token = load_token()
patients = load_patients()
self.after(0, lambda: self._populate(patients))
except Exception as exc:
self.after(0, lambda: (
self.lbl_status.config(text="Chyba při načítání"),
messagebox.showerror("Chyba načítání", str(exc)),
))
def _populate(self, patients: list[dict]):
self.patients = patients
n = len(patients)
self.lbl_status.config(text=f"Celkem: {n} pacientů")
for i, p in enumerate(patients):
bg = BG_EVEN if i % 2 == 0 else BG_ODD
self._add_row(i, p, bg)
def _add_row(self, idx: int, p: dict, bg: str):
row = tk.Frame(self.list_frame, bg=bg, pady=4)
row.pack(fill="x", padx=6)
name = f"{p.get('surname', '')} {p.get('name', '')}"
rc = p.get("identification_number", "")
poj = p.get("insurance_name") or ""
dob = str(p.get("dob") or "")[:10] or ""
dek = p.get("posledni_dekurz") or ""
dek_fg = "#e74c3c" if dek == "" else "#2c3e50"
def lbl(text, width, fg="#2c3e50", font=("Segoe UI", 9)):
tk.Label(row, text=text, bg=bg, fg=fg,
font=font, width=width, anchor="w").pack(side="left", padx=2)
lbl(name, COL_WIDTHS["name"])
lbl(rc, COL_WIDTHS["rc"], font=("Consolas", 9))
lbl(poj, COL_WIDTHS["poj"])
lbl(dob, COL_WIDTHS["dob"])
lbl(dek, COL_WIDTHS["dek"], fg=dek_fg)
btn_vzp = tk.Button(row, text="VZP dotaz",
command=lambda pat=p: self._query_vzp(pat),
bg=BG_BTN_VZP, fg="white",
activebackground="#1a6fa8",
relief="flat", padx=8, cursor="hand2",
font=("Segoe UI", 9))
btn_vzp.pack(side="left", padx=6)
btn_zlom = tk.Button(row, text="Bod zlomu",
command=lambda pat=p: self._find_breakpoint(pat),
bg="#7d3c98", fg="white",
activebackground="#6c3483",
relief="flat", padx=8, cursor="hand2",
font=("Segoe UI", 9))
btn_zlom.pack(side="left", padx=2)
btn_rem = tk.Button(row, text="Označ neaktivní",
command=lambda pat=p, r=row: self._mark_removed(pat, r),
bg=BG_BTN_REM, fg="white",
activebackground="#922b21",
relief="flat", padx=8, cursor="hand2",
font=("Segoe UI", 9))
btn_rem.pack(side="left", padx=4)
# uložíme reference na tlačítka kvůli pozdějšímu deaktivování
row._btn_vzp = btn_vzp
row._btn_zlom = btn_zlom
row._btn_rem = btn_rem
# ── VZP DOTAZ ────────────────────────────────────────────────────────────
def _query_vzp(self, p: dict):
name = f"{p.get('surname')} {p.get('name')}"
rc = p.get("identification_number", "")
self._vzp_write(f"Načítám VZP data pro {name} (RC: {rc})…")
def worker():
try:
if not self.vzp_client:
self.vzp_client = VZPB2BClient(
"prod", str(CERT_PATH), CERT_PASSWORD, icz=ICZ
)
# Stav pojištění
xml_poj = self.vzp_client.stav_pojisteni(rc)
poj_data = self.vzp_client.parse_stav_pojisteni(xml_poj)
time.sleep(DELAY_VZP)
# Registrující lékař v odbornosti 001
xml_lek = self.vzp_client.registrace_lekare(rc, odbornosti=["001"])
lek_list = self.vzp_client.parse_registrace_lekare(xml_lek)
lines = [
f"Pacient : {name}",
f"RC : {rc}",
"" * 60,
]
# pojisteni
if poj_data and poj_data.get("nazevPojistovny"):
lines += [
f"Pojišťovna : {poj_data['nazevPojistovny']} "
f"(kód {poj_data.get('kodPojistovny', '?')})",
f"Stav pojist: {poj_data.get('stav', '?')}",
]
else:
lines.append("Pojišťovna : (nepodařilo se načíst)")
lines.append("")
# lékař
if lek_list:
for lek in lek_list:
do = lek.get("datum_ukonceni") or "dosud"
lines += [
f"Lékař (001): {lek.get('nazev_lekare', '?')}",
f" ICP : {lek.get('ICP', '?')}",
f" ICZ : {lek.get('ICZ', '?')}",
f" Zařízení : {lek.get('nazev_zzz', '?')}",
f" Registrace: {lek.get('datum_registrace', '?')}{do}",
]
else:
lines.append("Lékař (001): NEMÁ REGISTRUJÍCÍHO PRAKTIKA")
self.after(0, lambda: self._vzp_write("\n".join(lines)))
except Exception as exc:
self.after(0, lambda: self._vzp_write(f"CHYBA: {exc}"))
threading.Thread(target=worker, daemon=True).start()
# ── BOD ZLOMU POJIŠTĚNÍ ──────────────────────────────────────────────────
def _find_breakpoint(self, p: dict):
name = f"{p.get('surname')} {p.get('name')}"
rc = p.get("identification_number", "")
self._vzp_write(
f"Hledám bod zlomu pojištění pro {name} (RC: {rc})…\n"
f" Začínám od dnes 2 roky, binary search (~10 API volání)."
)
def worker():
try:
if not self.vzp_client:
self.vzp_client = VZPB2BClient(
"prod", str(CERT_PATH), CERT_PASSWORD, icz=ICZ
)
result = find_insurance_breakpoint(
self.vzp_client, rc,
years_back=2,
progress_cb=lambda msg: self.after(
0, lambda m=msg: self._vzp_write(
f"Hledám bod zlomu pro {name}\n {m}"
)
),
)
lines = [
f"Pacient : {name}",
f"RC : {rc}",
"" * 60,
result["message"],
"",
f"Stav na začátku ({date.today() - timedelta(days=730)}):",
f" pojišťovna: {result['status_start'].get('nazevPojistovny')} "
f"/ stav: {result['status_start'].get('stav')}",
f"Stav dnes ({date.today()}):",
f" pojišťovna: {result['status_end'].get('nazevPojistovny')} "
f"/ stav: {result['status_end'].get('stav')}",
]
self.after(0, lambda: self._vzp_write("\n".join(lines)))
except Exception as exc:
self.after(0, lambda: self._vzp_write(f"CHYBA: {exc}"))
threading.Thread(target=worker, daemon=True).start()
# ── OZNAČENÍ NEAKTIVNÍ ───────────────────────────────────────────────────
def _mark_removed(self, p: dict, row: tk.Frame):
name = f"{p.get('surname')} {p.get('name')}"
rc = p.get("identification_number", "")
if not messagebox.askyesno(
"Potvrdit akci",
f"Označit pacienta jako NEAKTIVNÍ?\n\n{name}\nRC: {rc}",
):
return
row._btn_rem.config(state="disabled", text="")
def worker():
try:
ok, err = api_set_removed(self.token, p["patient_id"])
if not ok:
self.after(0, lambda: (
row._btn_rem.config(state="normal", text="Označ neaktivní"),
messagebox.showerror("Chyba Medevio API", str(err)),
))
return
mysql_set_removed(p["patient_id"])
self.after(0, lambda: self._row_done(row, name))
except Exception as exc:
self.after(0, lambda: (
row._btn_rem.config(state="normal", text="Označ neaktivní"),
messagebox.showerror("Chyba", str(exc)),
))
threading.Thread(target=worker, daemon=True).start()
def _row_done(self, row: tk.Frame, name: str):
row.configure(bg=BG_REMOVED)
for w in row.winfo_children():
if isinstance(w, tk.Label):
w.configure(bg=BG_REMOVED)
elif isinstance(w, tk.Button):
w.configure(state="disabled", relief="flat")
self._vzp_write(f"✓ Pacient označen jako NEAKTIVNÍ: {name}")
# ── HELPERS ──────────────────────────────────────────────────────────────
def _vzp_write(self, text: str):
self.vzp_text.config(state="normal")
self.vzp_text.delete("1.0", "end")
self.vzp_text.insert("end", text)
self.vzp_text.config(state="disabled")
# ── ENTRY POINT ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
app = PacientApp()
app.mainloop()