#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ GUI: Pacienti ACTIVE v Medevio, kteří nejsou registrovaní v Medicusu. Sloupce: Jméno, RC, Pojišťovna, Narozen, Poslední dekurz Tlačítka: [VZP dotaz] [Označ neaktivní] Spodní panel: výsledek VZP dotazu (pojišťovna + registrující lékař) """ import sys import threading import time import tkinter as tk from tkinter import ttk, messagebox from pathlib import Path from datetime import date, timedelta sys.path.insert(0, "U:/OrdinaceProjekt") import pymysql import requests from Knihovny.medicus_db import get_medicus_db from Knihovny.vzpb2b_client import VZPB2BClient # ── CONFIG ────────────────────────────────────────────────────────────────── GRAPHQL_URL = "https://api.medevio.cz/graphql" CLINIC_SLUG = "mudr-buzalkova" TOKEN_PATH = Path("U:/OrdinaceProjekt/Medevio/token.txt") CERT_PATH = Path("U:/OrdinaceProjekt/Insurance/Certificates/picka.pfx") CERT_PASSWORD = "Vlado7309208104+" ICZ = "09305000" DELAY_VZP = 1.5 # s mezi VZP voláními — max 2 dotazy / 3 s DB_CONFIG = { "host": "192.168.1.76", "port": 3306, "user": "root", "password": "Vlado9674+", "database": "medevio", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } MUTATION_REMOVE = """ mutation ClinicPatientEditStatusModal_UpdateClinicPatientStatus( $clinicSlug: String!, $patientId: String!, $status: ClinicPatientStatus! ) { updated: updateClinicPatientStatus( clinicSlug: $clinicSlug patientId: $patientId status: $status ) } """ # ── HELPERS ────────────────────────────────────────────────────────────────── def load_token() -> str: t = TOKEN_PATH.read_text(encoding="utf-8").strip() return t.removeprefix("Bearer ").strip() def medevio_headers(token: str) -> dict: return { "content-type": "application/json", "authorization": f"Bearer {token}", "origin": "https://my.medevio.cz", "referer": "https://my.medevio.cz/", } # ── DATA LOADING ───────────────────────────────────────────────────────────── def load_patients() -> list[dict]: """ Vrátí pacienty ACTIVE v Medevio (MySQL), kteří nejsou registrovaní v Medicusu (Firebird). Přidá datum posledního záznamu v dekurzu. """ # 1) Medicus — registrovaná rodná čísla db = get_medicus_db() rows = db.get_active_registered_patients() medicus_rc: set[str] = {r[0].strip() for r in rows if r[0]} db.close() # 2) MySQL — všichni ACTIVE s RC conn = pymysql.connect(**DB_CONFIG) with conn.cursor() as cur: cur.execute(""" SELECT patient_id, name, surname, identification_number, insurance_name, insurance_code, user_id, dob FROM medevio_pacient WHERE status = 'ACTIVE' AND identification_number IS NOT NULL AND identification_number <> '' AND surname NOT LIKE '%%\\%%%' ORDER BY surname, name """) all_active = cur.fetchall() conn.close() # 3) Filtruj mimo Medicus extras = [p for p in all_active if p["identification_number"] not in medicus_rc] if not extras: return extras # 4) Datum posledního záznamu v dekurzu (LOG tabulka) rc_list = [p["identification_number"] for p in extras] # Bezpečné inline — RC jsou vždy jen číslice rc_sql_in = ",".join(f"'{rc}'" for rc in rc_list) db2 = get_medicus_db() try: # Nejdřív idpac → rodcis mapping idpac_rows = db2.query( f"SELECT idpac, rodcis FROM kar WHERE rodcis IN ({rc_sql_in})" ) rc_to_idpac: dict[str, int] = {} idpac_set: list[str] = [] for r in idpac_rows: rc_to_idpac[r[1].strip()] = r[0] idpac_set.append(str(r[0])) # Datum posledního záznamu v LOG pro tyto idpac dekurz_map: dict[int, str] = {} if idpac_set: idpac_sql_in = ",".join(idpac_set) log_rows = db2.query( f"SELECT idpac, MAX(datum) FROM log " f"WHERE idpac IN ({idpac_sql_in}) GROUP BY idpac" ) dekurz_map = {r[0]: str(r[1])[:10] if r[1] else None for r in log_rows} except Exception: rc_to_idpac = {} dekurz_map = {} finally: db2.close() for p in extras: idpac = rc_to_idpac.get(p["identification_number"]) p["posledni_dekurz"] = dekurz_map.get(idpac) if idpac else None return extras def api_set_removed(token: str, patient_id: str) -> tuple[bool, str | None]: """Volá Medevio GraphQL API — nastaví status REMOVED.""" resp = requests.post( GRAPHQL_URL, headers=medevio_headers(token), json={ "operationName": "ClinicPatientEditStatusModal_UpdateClinicPatientStatus", "query": MUTATION_REMOVE, "variables": { "clinicSlug": CLINIC_SLUG, "patientId": patient_id, "status": "REMOVED", }, }, timeout=20, ) resp.raise_for_status() data = resp.json() if "errors" in data: return False, str(data["errors"]) return bool(data.get("data", {}).get("updated")), None def mysql_set_removed(patient_id: str) -> None: """Aktualizuje status v MySQL tabulce.""" conn = pymysql.connect(**DB_CONFIG) with conn.cursor() as cur: cur.execute( "UPDATE medevio_pacient SET status='REMOVED' WHERE patient_id = %s", (patient_id,), ) conn.commit() conn.close() # ── BINARY SEARCH — BOD ZLOMU POJIŠTĚNÍ ───────────────────────────────────── def _get_insurance_status(vzp_client: VZPB2BClient, rc: str, k_datu: str) -> dict: """Vrátí parsed stav pojištění k danému datu — s retry na 503.""" for attempt in range(3): try: xml = vzp_client.stav_pojisteni(rc, k_datu=k_datu) return vzp_client.parse_stav_pojisteni(xml) except Exception: if attempt < 2: time.sleep(3) raise RuntimeError(f"API selhalo 3× pro {k_datu}") def _is_active(status: dict) -> bool: """Pojistěnec je 'aktivní' pokud API vrátilo stav '1' (pojištěn).""" return status.get("stavVyrizeni") == 1 and status.get("stav") == "1" def find_insurance_breakpoint( vzp_client: VZPB2BClient, rc: str, years_back: int = 2, progress_cb=None, ) -> dict: """ Binary search: hledá datum, kdy pacient přešel ze stavu 'pojištěn' na jiný stav (jiná pojišťovna / nenalezen). Vrátí dict s klíči: found bool — zda byl zlom nalezen date_last str — poslední datum kdy byl stav 'aktivní' date_first_x str — první datum kdy byl stav jiný status_start dict — stav na začátku (dnes −years_back) status_end dict — stav dnes iterations int — počet API volání message str — popis výsledku """ today = date.today() start_date = today - timedelta(days=years_back * 365) iterations = 0 def check(d: date) -> tuple[dict, bool]: nonlocal iterations iterations += 1 if progress_cb: progress_cb(f"Testuju {d.isoformat()} (volání č. {iterations})…") status = _get_insurance_status(vzp_client, rc, d.isoformat()) time.sleep(1.5) return status, _is_active(status) status_start, active_start = check(start_date) status_end, active_end = check(today) # Pokud ani na začátku nebyl aktivní — nemáme levý okraj if not active_start: return { "found": False, "date_last": None, "date_first_x": None, "status_start": status_start, "status_end": status_end, "iterations": iterations, "message": ( f"Před {years_back} lety ({start_date.isoformat()}) již nebyl pojištěn " f"nebo byl u jiné pojišťovny.\n" f" Stav tehdy: {status_start.get('stav')} / " f"pojišťovna: {status_start.get('nazevPojistovny')}" ), } # Pokud je dnes stále aktivní — zlom nenalezen v daném okně if active_end: return { "found": False, "date_last": today.isoformat(), "date_first_x": None, "status_start": status_start, "status_end": status_end, "iterations": iterations, "message": ( f"Pacient je stále pojištěn (i dnes).\n" f" Pojišťovna dnes: {status_end.get('nazevPojistovny')} / " f"stav: {status_end.get('stav')}" ), } # Binary search mezi start_date (aktivní) a today (neaktivní) left = start_date right = today left_status = status_start right_status = status_end while (right - left).days > 1: mid = left + timedelta(days=(right - left).days // 2) mid_status, mid_active = check(mid) if mid_active: left = mid left_status = mid_status else: right = mid right_status = mid_status return { "found": True, "date_last": left.isoformat(), "date_first_x": right.isoformat(), "status_start": status_start, "status_end": status_end, "iterations": iterations, "message": ( f"Bod zlomu nalezen:\n" f" Poslední den jako pojištěnec : {left.isoformat()}\n" f" Pojišťovna: {left_status.get('nazevPojistovny')} " f"(kód {left_status.get('kodPojistovny')}) / " f"stav: {left_status.get('stav')}\n" f" První den se změnou : {right.isoformat()}\n" f" Pojišťovna: {right_status.get('nazevPojistovny')} " f"(kód {right_status.get('kodPojistovny')}) / " f"stav: {right_status.get('stav')}\n" f" Celkem API volání: {iterations}" ), } # ── GUI ─────────────────────────────────────────────────────────────────────── COL_WIDTHS = { "name": 28, # chars "rc": 13, "poj": 32, "dob": 10, "dek": 12, } BG_EVEN = "#ffffff" BG_ODD = "#f4f6f8" BG_REMOVED = "#d5f5e3" BG_HEADER = "#2c3e50" FG_HEADER = "#ffffff" BG_BTN_VZP = "#2980b9" BG_BTN_REM = "#c0392b" class PacientApp(tk.Tk): def __init__(self): super().__init__() self.title("Pacienti Medevio · mimo Medicus") self.geometry("1320x780") self.minsize(900, 600) self.configure(bg="#f0f0f0") self.token: str = "" self.vzp_client: VZPB2BClient | None = None self.patients: list[dict] = [] self._build_ui() self._start_load() # ── UI CONSTRUCTION ────────────────────────────────────────────────────── def _build_ui(self): self._build_topbar() self._build_colheaders() self._build_list_area() self._build_bottom_panel() def _build_topbar(self): bar = tk.Frame(self, bg=BG_HEADER, pady=8) bar.pack(fill="x") tk.Label(bar, text="Pacienti ACTIVE v Medevio · mimo Medicus", bg=BG_HEADER, fg=FG_HEADER, font=("Segoe UI", 13, "bold")).pack(side="left", padx=14) self.lbl_status = tk.Label(bar, text="Načítám…", bg=BG_HEADER, fg="#bdc3c7", font=("Segoe UI", 10)) self.lbl_status.pack(side="left", padx=8) tk.Button(bar, text="⟳ Obnovit", command=self._start_load, bg="#3498db", fg="white", activebackground="#2980b9", relief="flat", padx=12, pady=2, cursor="hand2").pack(side="right", padx=14) def _build_colheaders(self): hdr = tk.Frame(self, bg="#dfe6e9", pady=5, padx=10) hdr.pack(fill="x") cols = [ ("Příjmení, Jméno", COL_WIDTHS["name"]), ("Rodné číslo", COL_WIDTHS["rc"]), ("Pojišťovna", COL_WIDTHS["poj"]), ("Narozen", COL_WIDTHS["dob"]), ("Posl. dekurz", COL_WIDTHS["dek"]), ("", 12), ("", 14), ("", 18), ] for text, w in cols: tk.Label(hdr, text=text, bg="#dfe6e9", font=("Segoe UI", 9, "bold"), width=w, anchor="w").pack(side="left") def _build_list_area(self): wrapper = tk.Frame(self, bg="#e0e0e0") wrapper.pack(fill="both", expand=True, padx=4, pady=(0, 4)) self.canvas = tk.Canvas(wrapper, bg="#ffffff", highlightthickness=0) vsb = ttk.Scrollbar(wrapper, orient="vertical", command=self.canvas.yview) self.canvas.configure(yscrollcommand=vsb.set) vsb.pack(side="right", fill="y") self.canvas.pack(side="left", fill="both", expand=True) self.list_frame = tk.Frame(self.canvas, bg="#ffffff") self._cwin = self.canvas.create_window((0, 0), window=self.list_frame, anchor="nw") self.list_frame.bind("", lambda e: self.canvas.configure( scrollregion=self.canvas.bbox("all"))) self.canvas.bind("", lambda e: self.canvas.itemconfig(self._cwin, width=e.width)) self.canvas.bind("", lambda e: self.canvas.yview_scroll( int(-1 * (e.delta / 120)), "units")) def _build_bottom_panel(self): pnl = tk.LabelFrame(self, text=" Výsledek VZP dotazu ", font=("Segoe UI", 9, "bold"), bg="#f0f0f0", pady=6, padx=8) pnl.pack(fill="x", padx=6, pady=(0, 6)) self.vzp_text = tk.Text(pnl, height=13, font=("Consolas", 9), bg="#1e1e1e", fg="#d4d4d4", insertbackground="#d4d4d4", relief="flat", wrap="word", state="disabled") self.vzp_text.pack(fill="x") self._vzp_write("Vyberte pacienta a stiskněte [VZP dotaz].") # ── DATA FLOW ──────────────────────────────────────────────────────────── def _start_load(self): self.lbl_status.config(text="Načítám…") for w in self.list_frame.winfo_children(): w.destroy() threading.Thread(target=self._bg_load, daemon=True).start() def _bg_load(self): try: self.token = load_token() patients = load_patients() self.after(0, lambda: self._populate(patients)) except Exception as exc: self.after(0, lambda: ( self.lbl_status.config(text="Chyba při načítání"), messagebox.showerror("Chyba načítání", str(exc)), )) def _populate(self, patients: list[dict]): self.patients = patients n = len(patients) self.lbl_status.config(text=f"Celkem: {n} pacientů") for i, p in enumerate(patients): bg = BG_EVEN if i % 2 == 0 else BG_ODD self._add_row(i, p, bg) def _add_row(self, idx: int, p: dict, bg: str): row = tk.Frame(self.list_frame, bg=bg, pady=4) row.pack(fill="x", padx=6) name = f"{p.get('surname', '')} {p.get('name', '')}" rc = p.get("identification_number", "") poj = p.get("insurance_name") or "—" dob = str(p.get("dob") or "")[:10] or "—" dek = p.get("posledni_dekurz") or "—" dek_fg = "#e74c3c" if dek == "—" else "#2c3e50" def lbl(text, width, fg="#2c3e50", font=("Segoe UI", 9)): tk.Label(row, text=text, bg=bg, fg=fg, font=font, width=width, anchor="w").pack(side="left", padx=2) lbl(name, COL_WIDTHS["name"]) lbl(rc, COL_WIDTHS["rc"], font=("Consolas", 9)) lbl(poj, COL_WIDTHS["poj"]) lbl(dob, COL_WIDTHS["dob"]) lbl(dek, COL_WIDTHS["dek"], fg=dek_fg) btn_vzp = tk.Button(row, text="VZP dotaz", command=lambda pat=p: self._query_vzp(pat), bg=BG_BTN_VZP, fg="white", activebackground="#1a6fa8", relief="flat", padx=8, cursor="hand2", font=("Segoe UI", 9)) btn_vzp.pack(side="left", padx=6) btn_zlom = tk.Button(row, text="Bod zlomu", command=lambda pat=p: self._find_breakpoint(pat), bg="#7d3c98", fg="white", activebackground="#6c3483", relief="flat", padx=8, cursor="hand2", font=("Segoe UI", 9)) btn_zlom.pack(side="left", padx=2) btn_rem = tk.Button(row, text="Označ neaktivní", command=lambda pat=p, r=row: self._mark_removed(pat, r), bg=BG_BTN_REM, fg="white", activebackground="#922b21", relief="flat", padx=8, cursor="hand2", font=("Segoe UI", 9)) btn_rem.pack(side="left", padx=4) # uložíme reference na tlačítka kvůli pozdějšímu deaktivování row._btn_vzp = btn_vzp row._btn_zlom = btn_zlom row._btn_rem = btn_rem # ── VZP DOTAZ ──────────────────────────────────────────────────────────── def _query_vzp(self, p: dict): name = f"{p.get('surname')} {p.get('name')}" rc = p.get("identification_number", "") self._vzp_write(f"Načítám VZP data pro {name} (RC: {rc})…") def worker(): try: if not self.vzp_client: self.vzp_client = VZPB2BClient( "prod", str(CERT_PATH), CERT_PASSWORD, icz=ICZ ) # Stav pojištění xml_poj = self.vzp_client.stav_pojisteni(rc) poj_data = self.vzp_client.parse_stav_pojisteni(xml_poj) time.sleep(DELAY_VZP) # Registrující lékař v odbornosti 001 xml_lek = self.vzp_client.registrace_lekare(rc, odbornosti=["001"]) lek_list = self.vzp_client.parse_registrace_lekare(xml_lek) lines = [ f"Pacient : {name}", f"RC : {rc}", "─" * 60, ] # pojisteni if poj_data and poj_data.get("nazevPojistovny"): lines += [ f"Pojišťovna : {poj_data['nazevPojistovny']} " f"(kód {poj_data.get('kodPojistovny', '?')})", f"Stav pojist: {poj_data.get('stav', '?')}", ] else: lines.append("Pojišťovna : (nepodařilo se načíst)") lines.append("") # lékař if lek_list: for lek in lek_list: do = lek.get("datum_ukonceni") or "dosud" lines += [ f"Lékař (001): {lek.get('nazev_lekare', '?')}", f" ICP : {lek.get('ICP', '?')}", f" ICZ : {lek.get('ICZ', '?')}", f" Zařízení : {lek.get('nazev_zzz', '?')}", f" Registrace: {lek.get('datum_registrace', '?')} — {do}", ] else: lines.append("Lékař (001): NEMÁ REGISTRUJÍCÍHO PRAKTIKA") self.after(0, lambda: self._vzp_write("\n".join(lines))) except Exception as exc: self.after(0, lambda: self._vzp_write(f"CHYBA: {exc}")) threading.Thread(target=worker, daemon=True).start() # ── BOD ZLOMU POJIŠTĚNÍ ────────────────────────────────────────────────── def _find_breakpoint(self, p: dict): name = f"{p.get('surname')} {p.get('name')}" rc = p.get("identification_number", "") self._vzp_write( f"Hledám bod zlomu pojištění pro {name} (RC: {rc})…\n" f" Začínám od dnes −2 roky, binary search (~10 API volání)." ) def worker(): try: if not self.vzp_client: self.vzp_client = VZPB2BClient( "prod", str(CERT_PATH), CERT_PASSWORD, icz=ICZ ) result = find_insurance_breakpoint( self.vzp_client, rc, years_back=2, progress_cb=lambda msg: self.after( 0, lambda m=msg: self._vzp_write( f"Hledám bod zlomu pro {name}…\n {m}" ) ), ) lines = [ f"Pacient : {name}", f"RC : {rc}", "─" * 60, result["message"], "", f"Stav na začátku ({date.today() - timedelta(days=730)}):", f" pojišťovna: {result['status_start'].get('nazevPojistovny')} " f"/ stav: {result['status_start'].get('stav')}", f"Stav dnes ({date.today()}):", f" pojišťovna: {result['status_end'].get('nazevPojistovny')} " f"/ stav: {result['status_end'].get('stav')}", ] self.after(0, lambda: self._vzp_write("\n".join(lines))) except Exception as exc: self.after(0, lambda: self._vzp_write(f"CHYBA: {exc}")) threading.Thread(target=worker, daemon=True).start() # ── OZNAČENÍ NEAKTIVNÍ ─────────────────────────────────────────────────── def _mark_removed(self, p: dict, row: tk.Frame): name = f"{p.get('surname')} {p.get('name')}" rc = p.get("identification_number", "") if not messagebox.askyesno( "Potvrdit akci", f"Označit pacienta jako NEAKTIVNÍ?\n\n{name}\nRC: {rc}", ): return row._btn_rem.config(state="disabled", text="…") def worker(): try: ok, err = api_set_removed(self.token, p["patient_id"]) if not ok: self.after(0, lambda: ( row._btn_rem.config(state="normal", text="Označ neaktivní"), messagebox.showerror("Chyba Medevio API", str(err)), )) return mysql_set_removed(p["patient_id"]) self.after(0, lambda: self._row_done(row, name)) except Exception as exc: self.after(0, lambda: ( row._btn_rem.config(state="normal", text="Označ neaktivní"), messagebox.showerror("Chyba", str(exc)), )) threading.Thread(target=worker, daemon=True).start() def _row_done(self, row: tk.Frame, name: str): row.configure(bg=BG_REMOVED) for w in row.winfo_children(): if isinstance(w, tk.Label): w.configure(bg=BG_REMOVED) elif isinstance(w, tk.Button): w.configure(state="disabled", relief="flat") self._vzp_write(f"✓ Pacient označen jako NEAKTIVNÍ: {name}") # ── HELPERS ────────────────────────────────────────────────────────────── def _vzp_write(self, text: str): self.vzp_text.config(state="normal") self.vzp_text.delete("1.0", "end") self.vzp_text.insert("end", text) self.vzp_text.config(state="disabled") # ── ENTRY POINT ────────────────────────────────────────────────────────────── if __name__ == "__main__": app = PacientApp() app.mainloop()