commit 66e10b60c3b845c2c237311527d7262854e13ce4 Author: Vladimir Buzalka Date: Sat Apr 18 06:59:11 2026 +0200 Initial commit - Insurance VZP ověření pojištění diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..40b3de8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Python +.venv/ +__pycache__/ +*.pyc +*.pyo + +# IDE +.idea/ +.vscode/ + +# Certifikáty (soukromé klíče - nikdy do gitu!) +**/*.pfx +**/*.p12 +**/*.pem +**/*.key +**/Certificates/ + +# Výstupy +**/Output/ +*.pdf + +# Logy +*.log diff --git a/Insurance/10 FinalSaveInsuranceStatusScript(R).py b/Insurance/10 FinalSaveInsuranceStatusScript(R).py new file mode 100644 index 0000000..90b330c --- /dev/null +++ b/Insurance/10 FinalSaveInsuranceStatusScript(R).py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# this script can be run several times on the same day, in such case it works incrementally + +import sys +from pathlib import Path + +# ========================================== +# PROJECT ROOT (import fix) +# ========================================== +PROJECT_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +import time +import logging +from Knihovny.medicus_db import MedicusDB +from Knihovny.vzpb2b_client import VZPB2BClient +import pymysql +from datetime import date + +# ========================================== +# LOGGING SETUP +# ========================================== +logging.basicConfig( + filename="insurance_check.log", + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + encoding="utf-8" +) + +console = logging.getLogger("console") +console.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(message)s")) +console.addHandler(handler) + +def log_info(msg): + logging.info(msg) + console.info(msg) + +def log_error(msg): + logging.error(msg) + console.error(msg) + +# ========================================== +# MYSQL CONNECTION +# ========================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3306, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4", + autocommit=True +) + +# ========================================== +# SAVE RESULT +# ========================================== +def save_insurance_status(mysql_conn, rc, prijmeni, jmeno, k_datu, result, xml_text): + """ + Uloží čistou odpověď VZP + identifikační údaje pacienta. + Pojišťovna je VÝHRADNĚ z odpovědi VZP. + """ + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, prijmeni, jmeno, k_datu, + stav, kod_pojistovny, nazev_pojistovny, + pojisteni_kod, stav_vyrizeni, response_xml) + VALUES (%s, %s, %s, %s, + %s, %s, %s, + %s, %s, %s) + """ + + with mysql_conn.cursor() as cur: + cur.execute(sql, ( + rc, + prijmeni, + jmeno, + k_datu, + result["stav"], + result["kodPojistovny"], # ← VZP + result["nazevPojistovny"], # ← VZP + result["pojisteniKod"], # ← VZP + result["stavVyrizeni"], # ← VZP + xml_text + )) + + +# ========================================== +# CONFIGURATION +# ========================================== +# con = fdb.connect( +# host='192.168.1.10', database=r'm:\MEDICUS\data\medicus.FDB', +# user='sysdba', password='masterkey',charset='WIN1250') +HOST = "192.168.1.4" +DB_PATH = r"c:\Medicus 3\data\MEDICUS.FDB" + +PFX_PATH = Path(__file__).resolve().parent / "Certificates" / "picka.pfx" +# PFX_PATH = PROJECT_ROOT / "certificates" / "MBcert.pfx" +PFX_PASSWORD = "Vlado7309208104+" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +# sanity check +if not PFX_PATH.exists(): + raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}") + +# ========================================== +# INIT CONNECTIONS +# ========================================== +db = MedicusDB(HOST, DB_PATH) +vzp = VZPB2BClient( + ENV, + str(PFX_PATH), # <-- important: pass as string + PFX_PASSWORD, + icz=ICZ, + dic=DIC +) + +# ========================================== +# FETCH REGISTERED PATIENTS +# ========================================== +patients = db.get_active_registered_patients() +log_info(f"Loaded {len(patients)} registered patients") + +today = date.today() + +# ========================================== +# FILTER: ONLY PATIENTS NOT CHECKED TODAY +# ========================================== +patients_to_check = [] + +with mysql.cursor(pymysql.cursors.DictCursor) as cur: + for rc, prijmeni, jmeno, poj in patients: + cur.execute( + "SELECT MAX(k_datu) AS last_check FROM vzp_stav_pojisteni WHERE rc = %s", + (rc,) + ) + row = cur.fetchone() + last_check = row["last_check"] + + if last_check is None or last_check < today: + patients_to_check.append((rc, prijmeni, jmeno)) + +log_info(f"Incremental run: {len(patients_to_check)} patients to check today\n") + +# ========================================== +# MAIN LOOP (1 of N) +# ========================================== +total = len(patients_to_check) + +for idx, (rodcis, prijmeni, jmeno) in enumerate(patients_to_check, 1): + + log_info(f"[{idx}/{total}] Checking {prijmeni} {jmeno} ({rodcis})") + + try: + xml = vzp.stav_pojisteni(rc=rodcis, k_datu=today.isoformat()) + except Exception as e: + log_error(f"❌ VZP REQUEST FAILED for {rodcis}: {e}") + time.sleep(2) + continue + + if not xml.strip().startswith("<"): + log_error(f"❌ INVALID XML for RC {rodcis}") + time.sleep(2) + continue + + try: + result = vzp.parse_stav_pojisteni(xml) + except Exception as e: + log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}") + time.sleep(2) + continue + + try: + save_insurance_status( + mysql, + rodcis, + prijmeni, + jmeno, + today, + result, + xml + ) + except Exception as e: + log_error(f"❌ MYSQL INSERT ERROR for RC {rodcis}: {e}") + time.sleep(2) + continue + + log_info(f" ✔ OK ({result['nazevPojistovny']})") + time.sleep(1.4) + +db.close() +mysql.close() +log_info("\nDONE – incremental insurance check finished.") diff --git a/Insurance/10 Ověření proti medicus.py b/Insurance/10 Ověření proti medicus.py new file mode 100644 index 0000000..291fd8a --- /dev/null +++ b/Insurance/10 Ověření proti medicus.py @@ -0,0 +1,209 @@ +import sys +from pathlib import Path +from datetime import datetime, date, timedelta +import os +import pymysql + +from Knihovny.medicus_db import MedicusDB +from Knihovny.vzpb2b_client import VZPB2BClient +from jinja2 import Environment, FileSystemLoader +from weasyprint import HTML + +# ========================================== +# PROJECT ROOT (import + paths) +# ========================================== +script_location = Path(__file__).resolve().parent +project_root = script_location.parent +sys.path.insert(0, str(project_root)) + +# ========================================== +# CONFIGURATION +# ========================================== +MEDICUS_HOST = "192.168.1.4" +MEDICUS_DB_PATH = r"c:\Medicus 3\data\MEDICUS.FDB" + +MYSQL_CONFIG = { + "host": "192.168.1.76", + "port": 3306, + "user": "root", + "password": "Vlado9674+", + "database": "medevio", + "charset": "utf8mb4", + "autocommit": True +} + +PFX_PATH = script_location / "Certificates" / "MBcert.pfx" +PFX_PASSWORD = "Vlado7309208104++" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +if not PFX_PATH.exists(): + raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}") + +# ========================================== +# PATHS (templates, output) +# ========================================== +template_dir = script_location / "Templates" +output_dir = script_location / "Output" +output_dir.mkdir(exist_ok=True) + +font_regular = (template_dir / "fonts" / "DejaVuSans.ttf").as_uri() +font_bold = (template_dir / "fonts" / "DejaVuSans-Bold.ttf").as_uri() +font_italic = (template_dir / "fonts" / "DejaVuSans-Oblique.ttf").as_uri() + +# ========================================== +# INIT CONNECTIONS +# ========================================== +print("Connecting to Medicus...") +medicus = MedicusDB(MEDICUS_HOST, MEDICUS_DB_PATH) + +print("Connecting to MySQL...") +mysql = pymysql.connect( + cursorclass=pymysql.cursors.DictCursor, + **MYSQL_CONFIG +) + +print("Initializing VZP B2B client...") +vzp = VZPB2BClient( + ENV, + str(PFX_PATH), + PFX_PASSWORD, + icz=ICZ, + dic=DIC +) + +# ========================================== +# BINÁRNÍ HLEDÁNÍ DATA ZLOMU +# ========================================== +def find_insurance_break_date(vzp_client, rc, start_date, end_date): + try: + low = start_date + high = end_date + + stav_low = vzp_client.parse_stav_pojisteni( + vzp_client.stav_pojisteni(rc=rc, k_datu=low.isoformat()) + )["stav"] + + stav_high = vzp_client.parse_stav_pojisteni( + vzp_client.stav_pojisteni(rc=rc, k_datu=high.isoformat()) + )["stav"] + + if stav_low != "1" or stav_high == "1": + return None, None + + while (high - low).days > 1: + mid = low + timedelta(days=(high - low).days // 2) + + xml = vzp_client.stav_pojisteni(rc=rc, k_datu=mid.isoformat()) + stav_mid = vzp_client.parse_stav_pojisteni(xml)["stav"] + + if stav_mid == "1": + low = mid + else: + high = mid + + return low, high + + except Exception: + return None, None + +# ========================================== +# FETCH REGISTERED PATIENTS +# ========================================== +print("Fetching registered patients from Medicus...") +patients = medicus.get_active_registered_patients(as_dict=True) +patients_by_rc = {p["rodcis"]: p for p in patients} +print(f"Loaded {len(patients_by_rc)} registered patients") + +# ========================================== +# FETCH LAST INSURANCE STATES +# ========================================== +sql_last_states = """ +SELECT rc, stav +FROM ( + SELECT rc, stav, + ROW_NUMBER() OVER (PARTITION BY rc ORDER BY k_datu DESC) AS rn + FROM vzp_stav_pojisteni +) t +WHERE rn = 1 +""" + +with mysql.cursor() as cur: + cur.execute(sql_last_states) + last_states = cur.fetchall() + +# ========================================== +# COMPARE + BREAK DATE +# ========================================== +suspected = [] +today = date.today() + +for row in last_states: + rc = row["rc"] + stav = row["stav"] + + if rc in patients_by_rc and stav != "1": + p = patients_by_rc[rc] + + with mysql.cursor() as c2: + c2.execute(""" + SELECT MAX(k_datu) AS last_insured + FROM vzp_stav_pojisteni + WHERE rc = %s AND stav = '1' + """, (rc,)) + r2 = c2.fetchone() + last_known_insured = r2["last_insured"] + + insured_to = uninsured_from = None + if last_known_insured: + insured_to, uninsured_from = find_insurance_break_date( + vzp, rc, last_known_insured, today + ) + + suspected.append({ + "rc": rc, + "prijmeni": p["prijmeni"], + "jmeno": p["jmeno"], + "poj": p["poj"], + "stav": stav, + "insured_to": insured_to, + "uninsured_from": uninsured_from + }) + +print(f"Nalezeno {len(suspected)} problémových záznamů") + +# ========================================== +# CLEANUP DB +# ========================================== +medicus.close() +mysql.close() + +# ========================================== +# PDF GENERATION +# ========================================== +env = Environment(loader=FileSystemLoader(str(template_dir)), autoescape=True) +template = env.get_template("vzp_console_report.html") + +html_content = template.render( + patients=suspected, + generated_at=datetime.now().strftime("%d. %m. %Y %H:%M"), + font_regular=font_regular, + font_bold=font_bold, + font_italic=font_italic +) + +timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") +pdf_file = output_dir / f"kontrola_pojisteni_{timestamp}.pdf" + +print("Generuji PDF přes WeasyPrint...") +HTML(string=html_content, base_url=str(template_dir)).write_pdf(pdf_file) + +print("\n✅ PDF REPORT VYTVOŘEN:") +print(pdf_file.resolve()) + +try: + os.startfile(pdf_file) +except Exception: + pass diff --git a/Insurance/Templates/fonts/DejaVuSans-Bold.ttf b/Insurance/Templates/fonts/DejaVuSans-Bold.ttf new file mode 100644 index 0000000..6d65fa7 Binary files /dev/null and b/Insurance/Templates/fonts/DejaVuSans-Bold.ttf differ diff --git a/Insurance/Templates/fonts/DejaVuSans-Oblique.ttf b/Insurance/Templates/fonts/DejaVuSans-Oblique.ttf new file mode 100644 index 0000000..999bac7 Binary files /dev/null and b/Insurance/Templates/fonts/DejaVuSans-Oblique.ttf differ diff --git a/Insurance/Templates/fonts/DejaVuSans.ttf b/Insurance/Templates/fonts/DejaVuSans.ttf new file mode 100644 index 0000000..e5f7eec Binary files /dev/null and b/Insurance/Templates/fonts/DejaVuSans.ttf differ diff --git a/Insurance/Templates/vzp_console_report.html b/Insurance/Templates/vzp_console_report.html new file mode 100644 index 0000000..6020369 --- /dev/null +++ b/Insurance/Templates/vzp_console_report.html @@ -0,0 +1,136 @@ + + + + +Kontrola stavu pojištění + + + + + + +

Kontrola stavu pojištění (VZP)

+ +
+ Vygenerováno: {{ generated_at }} +
+ +{% if patients %} + + + + + + + + + + + +{% for p in patients %} + + + + + + + +{% endfor %} + +
Rodné čísloPříjmení a jménoPojišťovnaStavPlatnost pojištění
{{ p.rc }}{{ p.prijmeni }} {{ p.jmeno }}{{ p.poj }}{{ p.stav }} + {% if p.insured_to %} + pojištěn do {{ p.insured_to.strftime("%d.%m.%Y") }}
+ nepojištěn od {{ p.uninsured_from.strftime("%d.%m.%Y") }} + {% else %} + nelze určit + {% endif %} +
+{% else %} +

+✔ Vše v pořádku – žádné neshody nenalezeny. +

+{% endif %} + + + diff --git a/Knihovny/__init__.py b/Knihovny/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Knihovny/medicus_db.py b/Knihovny/medicus_db.py new file mode 100644 index 0000000..8674e6e --- /dev/null +++ b/Knihovny/medicus_db.py @@ -0,0 +1,73 @@ +import fdb + + +class MedicusDB: + + def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): + self.conn = fdb.connect( + host=host, + database=db_path, + user=user, + password=password, + charset=charset + ) + self.cur = self.conn.cursor() + + def get_fak_kapitace(self, as_dict=False): + sql = """ + SELECT + fak.id, + fak.cisfak, + fak.poj, + fak.kapdetail + FROM fak + WHERE fak.kapdetail IS NOT NULL + AND fak.kapdetail <> '' + """ + if as_dict: + return self.query_dict(sql) + return self.query(sql) + + def query(self, sql, params=None): + self.cur.execute(sql, params or ()) + return self.cur.fetchall() + + def query_dict(self, sql, params=None): + self.cur.execute(sql, params or ()) + cols = [d[0].strip().lower() for d in self.cur.description] + return [dict(zip(cols, row)) for row in self.cur.fetchall()] + + def get_active_registered_patients(self, as_dict=False): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM registr + JOIN kar ON registr.idpac = kar.idpac + WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') + AND kar.rodcis IS NOT NULL + AND kar.rodcis <> '' + AND kar.vyrazen <> 'A' + """ + if as_dict: + return self.query_dict(sql) + return self.query(sql) + + def get_all_patients(self, as_dict=False): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM kar + """ + if as_dict: + return self.query_dict(sql) + return self.query(sql) + + def close(self): + self.conn.close() diff --git a/Knihovny/vzpb2b_client.py b/Knihovny/vzpb2b_client.py new file mode 100644 index 0000000..a8ea67f --- /dev/null +++ b/Knihovny/vzpb2b_client.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from requests_pkcs12 import Pkcs12Adapter +import requests +import uuid +from datetime import date + + + +class VZPB2BClient: + def __init__(self, env: str, pfx_path: str, pfx_password: str, + icz: str = "00000000", dic: str = "00000000"): + + # Normalize environment name + env = env.lower().strip() + + if env in ("prod", "production", "live", "real"): + self.env = "prod" + elif env in ("simu", "simulace", "test", "testing"): + self.env = "simu" + else: + raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.") + + self.pfx_path = pfx_path + self.pfx_password = pfx_password + self.icz = icz + self.dic = dic + + # Prepare mTLS session + session = requests.Session() + session.mount( + "https://", + Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password) + ) + self.session = session + + # -------------------------------------------------------------- + # URL BUILDER + # -------------------------------------------------------------- + def _build_endpoint(self, service_name: str) -> str: + """ + SIMU: + https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU?sluzba=SIMU + PROD: + https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/ + """ + + if self.env == "simu": + simu_service = f"SIMU{service_name}" + return ( + f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/" + f"{simu_service}?sluzba={simu_service}" + ) + + # Production + return ( + f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}" + ) + + # -------------------------------------------------------------- + # SOAP HEADER BUILDER + # -------------------------------------------------------------- + def _header(self) -> str: + idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars + return f""" + {idZpravy} + + {self.icz} + {self.dic} + + """ + + # -------------------------------------------------------------- + # OVERPRUKAZ — EHIC CHECK + # -------------------------------------------------------------- + def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str: + """ + Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env). + Returns raw XML string. + """ + + service = "OverPrukazPojistenceB2B" + endpoint = self._build_endpoint(service) + + if not k_datu: + k_datu = date.today().isoformat() + + soap = f""" + + + + {self._header()} + + + + + {cislo_prukazu} + {k_datu} + + + + +""" + + headers = {"Content-Type": "text/xml; charset=utf-8"} + + print(f"Calling: {endpoint}") + response = self.session.post( + endpoint, + data=soap.encode("utf-8"), + headers=headers, + timeout=30 + ) + print("HTTP:", response.status_code) + return response.text + + def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None): + """ + Calls stavPojisteniB2B (SIMU or PROD). + """ + service = "stavPojisteniB2B" + endpoint = self._build_endpoint(service) + + if not k_datu: + k_datu = date.today().isoformat() + + prijmeni_xml = f"{prijmeni}" if prijmeni else "" + + soap = f""" + + + + {self._header()} + + + + + {rc} + {prijmeni_xml} + {k_datu} + + + + +""" + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": "process" + } + + print(f"Calling: {endpoint}") + resp = self.session.post(endpoint, data=soap.encode("utf-8"), + headers=headers, timeout=30) + print("HTTP:", resp.status_code) + return resp.text + + def parse_stav_pojisteni(self, xml_text: str): + """ + Parses stavPojisteniB2B SOAP response into a Python dict. + + Returned structure: + { + "stavVyrizeni": int, + "stav": str | None, + "kodPojistovny": str | None, + "nazevPojistovny": str | None, + "pojisteniKod": str | None + } + """ + + import xml.etree.ElementTree as ET + + NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B" + } + + root = ET.fromstring(xml_text) + + # ---- Extract status ---- + stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS) + stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None + + # ---- If no stavPojisteni element present (e.g. 0 or some errors) ---- + node_stav = root.find(".//vzp:stavPojisteni", NS) + if node_stav is None: + return { + "stavVyrizeni": stav_vyr, + "stav": None, + "kodPojistovny": None, + "nazevPojistovny": None, + "pojisteniKod": None, + } + + def get(tag): + el = node_stav.find(f"vzp:{tag}", NS) + return el.text.strip() if el is not None and el.text else None + + return { + "stavVyrizeni": stav_vyr, + "stav": get("stav"), + "kodPojistovny": get("kodPojistovny"), + "nazevPojistovny": get("nazevPojistovny"), + "pojisteniKod": get("pojisteniKod"), + }