Initial commit - Insurance VZP ověření pojištění

This commit is contained in:
Vladimir Buzalka
2026-04-18 06:59:11 +02:00
commit 66e10b60c3
10 changed files with 853 additions and 0 deletions
+23
View File
@@ -0,0 +1,23 @@
# Python
.venv/
__pycache__/
*.pyc
*.pyo
# IDE
.idea/
.vscode/
# Certifikáty (soukromé klíče - nikdy do gitu!)
**/*.pfx
**/*.p12
**/*.pem
**/*.key
**/Certificates/
# Výstupy
**/Output/
*.pdf
# Logy
*.log
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# this script can be run several times on the same day, in such case it works incrementally
import sys
from pathlib import Path
# ==========================================
# PROJECT ROOT (import fix)
# ==========================================
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
import time
import logging
from Knihovny.medicus_db import MedicusDB
from Knihovny.vzpb2b_client import VZPB2BClient
import pymysql
from datetime import date
# ==========================================
# LOGGING SETUP
# ==========================================
logging.basicConfig(
filename="insurance_check.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8"
)
console = logging.getLogger("console")
console.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
console.addHandler(handler)
def log_info(msg):
logging.info(msg)
console.info(msg)
def log_error(msg):
logging.error(msg)
console.error(msg)
# ==========================================
# MYSQL CONNECTION
# ==========================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3306,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
autocommit=True
)
# ==========================================
# SAVE RESULT
# ==========================================
def save_insurance_status(mysql_conn, rc, prijmeni, jmeno, k_datu, result, xml_text):
"""
Uloží čistou odpověď VZP + identifikační údaje pacienta.
Pojišťovna je VÝHRADNĚ z odpovědi VZP.
"""
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, prijmeni, jmeno, k_datu,
stav, kod_pojistovny, nazev_pojistovny,
pojisteni_kod, stav_vyrizeni, response_xml)
VALUES (%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s)
"""
with mysql_conn.cursor() as cur:
cur.execute(sql, (
rc,
prijmeni,
jmeno,
k_datu,
result["stav"],
result["kodPojistovny"], # ← VZP
result["nazevPojistovny"], # ← VZP
result["pojisteniKod"], # ← VZP
result["stavVyrizeni"], # ← VZP
xml_text
))
# ==========================================
# CONFIGURATION
# ==========================================
# con = fdb.connect(
# host='192.168.1.10', database=r'm:\MEDICUS\data\medicus.FDB',
# user='sysdba', password='masterkey',charset='WIN1250')
HOST = "192.168.1.4"
DB_PATH = r"c:\Medicus 3\data\MEDICUS.FDB"
PFX_PATH = Path(__file__).resolve().parent / "Certificates" / "picka.pfx"
# PFX_PATH = PROJECT_ROOT / "certificates" / "MBcert.pfx"
PFX_PASSWORD = "Vlado7309208104+"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
# sanity check
if not PFX_PATH.exists():
raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}")
# ==========================================
# INIT CONNECTIONS
# ==========================================
db = MedicusDB(HOST, DB_PATH)
vzp = VZPB2BClient(
ENV,
str(PFX_PATH), # <-- important: pass as string
PFX_PASSWORD,
icz=ICZ,
dic=DIC
)
# ==========================================
# FETCH REGISTERED PATIENTS
# ==========================================
patients = db.get_active_registered_patients()
log_info(f"Loaded {len(patients)} registered patients")
today = date.today()
# ==========================================
# FILTER: ONLY PATIENTS NOT CHECKED TODAY
# ==========================================
patients_to_check = []
with mysql.cursor(pymysql.cursors.DictCursor) as cur:
for rc, prijmeni, jmeno, poj in patients:
cur.execute(
"SELECT MAX(k_datu) AS last_check FROM vzp_stav_pojisteni WHERE rc = %s",
(rc,)
)
row = cur.fetchone()
last_check = row["last_check"]
if last_check is None or last_check < today:
patients_to_check.append((rc, prijmeni, jmeno))
log_info(f"Incremental run: {len(patients_to_check)} patients to check today\n")
# ==========================================
# MAIN LOOP (1 of N)
# ==========================================
total = len(patients_to_check)
for idx, (rodcis, prijmeni, jmeno) in enumerate(patients_to_check, 1):
log_info(f"[{idx}/{total}] Checking {prijmeni} {jmeno} ({rodcis})")
try:
xml = vzp.stav_pojisteni(rc=rodcis, k_datu=today.isoformat())
except Exception as e:
log_error(f"❌ VZP REQUEST FAILED for {rodcis}: {e}")
time.sleep(2)
continue
if not xml.strip().startswith("<"):
log_error(f"❌ INVALID XML for RC {rodcis}")
time.sleep(2)
continue
try:
result = vzp.parse_stav_pojisteni(xml)
except Exception as e:
log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}")
time.sleep(2)
continue
try:
save_insurance_status(
mysql,
rodcis,
prijmeni,
jmeno,
today,
result,
xml
)
except Exception as e:
log_error(f"❌ MYSQL INSERT ERROR for RC {rodcis}: {e}")
time.sleep(2)
continue
log_info(f" ✔ OK ({result['nazevPojistovny']})")
time.sleep(1.4)
db.close()
mysql.close()
log_info("\nDONE incremental insurance check finished.")
+209
View File
@@ -0,0 +1,209 @@
import sys
from pathlib import Path
from datetime import datetime, date, timedelta
import os
import pymysql
from Knihovny.medicus_db import MedicusDB
from Knihovny.vzpb2b_client import VZPB2BClient
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
# ==========================================
# PROJECT ROOT (import + paths)
# ==========================================
script_location = Path(__file__).resolve().parent
project_root = script_location.parent
sys.path.insert(0, str(project_root))
# ==========================================
# CONFIGURATION
# ==========================================
MEDICUS_HOST = "192.168.1.4"
MEDICUS_DB_PATH = r"c:\Medicus 3\data\MEDICUS.FDB"
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"autocommit": True
}
PFX_PATH = script_location / "Certificates" / "MBcert.pfx"
PFX_PASSWORD = "Vlado7309208104++"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
if not PFX_PATH.exists():
raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}")
# ==========================================
# PATHS (templates, output)
# ==========================================
template_dir = script_location / "Templates"
output_dir = script_location / "Output"
output_dir.mkdir(exist_ok=True)
font_regular = (template_dir / "fonts" / "DejaVuSans.ttf").as_uri()
font_bold = (template_dir / "fonts" / "DejaVuSans-Bold.ttf").as_uri()
font_italic = (template_dir / "fonts" / "DejaVuSans-Oblique.ttf").as_uri()
# ==========================================
# INIT CONNECTIONS
# ==========================================
print("Connecting to Medicus...")
medicus = MedicusDB(MEDICUS_HOST, MEDICUS_DB_PATH)
print("Connecting to MySQL...")
mysql = pymysql.connect(
cursorclass=pymysql.cursors.DictCursor,
**MYSQL_CONFIG
)
print("Initializing VZP B2B client...")
vzp = VZPB2BClient(
ENV,
str(PFX_PATH),
PFX_PASSWORD,
icz=ICZ,
dic=DIC
)
# ==========================================
# BINÁRNÍ HLEDÁNÍ DATA ZLOMU
# ==========================================
def find_insurance_break_date(vzp_client, rc, start_date, end_date):
try:
low = start_date
high = end_date
stav_low = vzp_client.parse_stav_pojisteni(
vzp_client.stav_pojisteni(rc=rc, k_datu=low.isoformat())
)["stav"]
stav_high = vzp_client.parse_stav_pojisteni(
vzp_client.stav_pojisteni(rc=rc, k_datu=high.isoformat())
)["stav"]
if stav_low != "1" or stav_high == "1":
return None, None
while (high - low).days > 1:
mid = low + timedelta(days=(high - low).days // 2)
xml = vzp_client.stav_pojisteni(rc=rc, k_datu=mid.isoformat())
stav_mid = vzp_client.parse_stav_pojisteni(xml)["stav"]
if stav_mid == "1":
low = mid
else:
high = mid
return low, high
except Exception:
return None, None
# ==========================================
# FETCH REGISTERED PATIENTS
# ==========================================
print("Fetching registered patients from Medicus...")
patients = medicus.get_active_registered_patients(as_dict=True)
patients_by_rc = {p["rodcis"]: p for p in patients}
print(f"Loaded {len(patients_by_rc)} registered patients")
# ==========================================
# FETCH LAST INSURANCE STATES
# ==========================================
sql_last_states = """
SELECT rc, stav
FROM (
SELECT rc, stav,
ROW_NUMBER() OVER (PARTITION BY rc ORDER BY k_datu DESC) AS rn
FROM vzp_stav_pojisteni
) t
WHERE rn = 1
"""
with mysql.cursor() as cur:
cur.execute(sql_last_states)
last_states = cur.fetchall()
# ==========================================
# COMPARE + BREAK DATE
# ==========================================
suspected = []
today = date.today()
for row in last_states:
rc = row["rc"]
stav = row["stav"]
if rc in patients_by_rc and stav != "1":
p = patients_by_rc[rc]
with mysql.cursor() as c2:
c2.execute("""
SELECT MAX(k_datu) AS last_insured
FROM vzp_stav_pojisteni
WHERE rc = %s AND stav = '1'
""", (rc,))
r2 = c2.fetchone()
last_known_insured = r2["last_insured"]
insured_to = uninsured_from = None
if last_known_insured:
insured_to, uninsured_from = find_insurance_break_date(
vzp, rc, last_known_insured, today
)
suspected.append({
"rc": rc,
"prijmeni": p["prijmeni"],
"jmeno": p["jmeno"],
"poj": p["poj"],
"stav": stav,
"insured_to": insured_to,
"uninsured_from": uninsured_from
})
print(f"Nalezeno {len(suspected)} problémových záznamů")
# ==========================================
# CLEANUP DB
# ==========================================
medicus.close()
mysql.close()
# ==========================================
# PDF GENERATION
# ==========================================
env = Environment(loader=FileSystemLoader(str(template_dir)), autoescape=True)
template = env.get_template("vzp_console_report.html")
html_content = template.render(
patients=suspected,
generated_at=datetime.now().strftime("%d. %m. %Y %H:%M"),
font_regular=font_regular,
font_bold=font_bold,
font_italic=font_italic
)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
pdf_file = output_dir / f"kontrola_pojisteni_{timestamp}.pdf"
print("Generuji PDF přes WeasyPrint...")
HTML(string=html_content, base_url=str(template_dir)).write_pdf(pdf_file)
print("\n✅ PDF REPORT VYTVOŘEN:")
print(pdf_file.resolve())
try:
os.startfile(pdf_file)
except Exception:
pass
Binary file not shown.
Binary file not shown.
Binary file not shown.
+136
View File
@@ -0,0 +1,136 @@
<!DOCTYPE html>
<html lang="cs">
<head>
<meta charset="UTF-8">
<title>Kontrola stavu pojištění</title>
<style>
@font-face {
font-family: 'DejaVu';
src: url('{{ font_regular }}');
font-weight: normal;
font-style: normal;
}
@font-face {
font-family: 'DejaVu';
src: url('{{ font_bold }}');
font-weight: bold;
font-style: normal;
}
@font-face {
font-family: 'DejaVu';
src: url('{{ font_italic }}');
font-weight: normal;
font-style: italic;
}
@page {
size: A4;
margin: 1.5cm;
@bottom-right {
content: "Strana " counter(page) " z " counter(pages);
font-size: 9pt;
}
}
body {
font-family: 'DejaVu', sans-serif;
font-size: 10pt;
color: #333;
}
h1 {
font-size: 16pt;
border-bottom: 2px solid #2c3e50;
padding-bottom: 5px;
}
.meta {
font-size: 9pt;
margin-bottom: 15px;
color: #666;
}
table {
width: 100%;
border-collapse: collapse;
}
th, td {
border: 1px solid #ccc;
padding: 6px 8px;
vertical-align: top;
}
th {
background-color: #f2f2f2;
}
tr {
page-break-inside: avoid;
}
tr:nth-child(even) {
background-color: #f9f9f9;
}
.status-bad {
color: #d9534f;
font-weight: bold;
}
.date-info {
font-size: 9pt;
color: #555;
}
</style>
</head>
<body>
<h1>Kontrola stavu pojištění (VZP)</h1>
<div class="meta">
Vygenerováno: {{ generated_at }}
</div>
{% if patients %}
<table>
<thead>
<tr>
<th>Rodné číslo</th>
<th>Příjmení a jméno</th>
<th>Pojišťovna</th>
<th>Stav</th>
<th>Platnost pojištění</th>
</tr>
</thead>
<tbody>
{% for p in patients %}
<tr>
<td>{{ p.rc }}</td>
<td><strong>{{ p.prijmeni }}</strong> {{ p.jmeno }}</td>
<td>{{ p.poj }}</td>
<td class="status-bad">{{ p.stav }}</td>
<td class="date-info">
{% if p.insured_to %}
pojištěn do {{ p.insured_to.strftime("%d.%m.%Y") }}<br>
nepojištěn od {{ p.uninsured_from.strftime("%d.%m.%Y") }}
{% else %}
nelze určit
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p style="color: green; font-weight: bold;">
&#10004; Vše v pořádku žádné neshody nenalezeny.
</p>
{% endif %}
</body>
</html>
View File
+73
View File
@@ -0,0 +1,73 @@
import fdb
class MedicusDB:
def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"):
self.conn = fdb.connect(
host=host,
database=db_path,
user=user,
password=password,
charset=charset
)
self.cur = self.conn.cursor()
def get_fak_kapitace(self, as_dict=False):
sql = """
SELECT
fak.id,
fak.cisfak,
fak.poj,
fak.kapdetail
FROM fak
WHERE fak.kapdetail IS NOT NULL
AND fak.kapdetail <> ''
"""
if as_dict:
return self.query_dict(sql)
return self.query(sql)
def query(self, sql, params=None):
self.cur.execute(sql, params or ())
return self.cur.fetchall()
def query_dict(self, sql, params=None):
self.cur.execute(sql, params or ())
cols = [d[0].strip().lower() for d in self.cur.description]
return [dict(zip(cols, row)) for row in self.cur.fetchall()]
def get_active_registered_patients(self, as_dict=False):
sql = """
SELECT
kar.rodcis,
kar.prijmeni,
kar.jmeno,
kar.poj
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
AND kar.rodcis IS NOT NULL
AND kar.rodcis <> ''
AND kar.vyrazen <> 'A'
"""
if as_dict:
return self.query_dict(sql)
return self.query(sql)
def get_all_patients(self, as_dict=False):
sql = """
SELECT
kar.rodcis,
kar.prijmeni,
kar.jmeno,
kar.poj
FROM kar
"""
if as_dict:
return self.query_dict(sql)
return self.query(sql)
def close(self):
self.conn.close()
+213
View File
@@ -0,0 +1,213 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from requests_pkcs12 import Pkcs12Adapter
import requests
import uuid
from datetime import date
class VZPB2BClient:
def __init__(self, env: str, pfx_path: str, pfx_password: str,
icz: str = "00000000", dic: str = "00000000"):
# Normalize environment name
env = env.lower().strip()
if env in ("prod", "production", "live", "real"):
self.env = "prod"
elif env in ("simu", "simulace", "test", "testing"):
self.env = "simu"
else:
raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.")
self.pfx_path = pfx_path
self.pfx_password = pfx_password
self.icz = icz
self.dic = dic
# Prepare mTLS session
session = requests.Session()
session.mount(
"https://",
Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password)
)
self.session = session
# --------------------------------------------------------------
# URL BUILDER
# --------------------------------------------------------------
def _build_endpoint(self, service_name: str) -> str:
"""
SIMU:
https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU<Service>?sluzba=SIMU<Service>
PROD:
https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/<Service>
"""
if self.env == "simu":
simu_service = f"SIMU{service_name}"
return (
f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/"
f"{simu_service}?sluzba={simu_service}"
)
# Production
return (
f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}"
)
# --------------------------------------------------------------
# SOAP HEADER BUILDER
# --------------------------------------------------------------
def _header(self) -> str:
idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars
return f"""
<com:idZpravy>{idZpravy}</com:idZpravy>
<com:idSubjektu>
<com:icz>{self.icz}</com:icz>
<com:dic>{self.dic}</com:dic>
</com:idSubjektu>
"""
# --------------------------------------------------------------
# OVERPRUKAZ — EHIC CHECK
# --------------------------------------------------------------
def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str:
"""
Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env).
Returns raw XML string.
"""
service = "OverPrukazPojistenceB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/OverPrukazPojistenceB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:OverPrukazPojistenceB2BPozadavek>
<vzp:cisloPrukazu>{cislo_prukazu}</vzp:cisloPrukazu>
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:OverPrukazPojistenceB2BPozadavek>
</soap:Body>
</soap:Envelope>
"""
headers = {"Content-Type": "text/xml; charset=utf-8"}
print(f"Calling: {endpoint}")
response = self.session.post(
endpoint,
data=soap.encode("utf-8"),
headers=headers,
timeout=30
)
print("HTTP:", response.status_code)
return response.text
def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None):
"""
Calls stavPojisteniB2B (SIMU or PROD).
"""
service = "stavPojisteniB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
prijmeni_xml = f"<vzp:prijmeni>{prijmeni}</vzp:prijmeni>" if prijmeni else ""
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/stavPojisteniB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:stavPojisteniB2B>
<vzp:cisloPojistence>{rc}</vzp:cisloPojistence>
{prijmeni_xml}
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:stavPojisteniB2B>
</soap:Body>
</soap:Envelope>
"""
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process"
}
print(f"Calling: {endpoint}")
resp = self.session.post(endpoint, data=soap.encode("utf-8"),
headers=headers, timeout=30)
print("HTTP:", resp.status_code)
return resp.text
def parse_stav_pojisteni(self, xml_text: str):
"""
Parses stavPojisteniB2B SOAP response into a Python dict.
Returned structure:
{
"stavVyrizeni": int,
"stav": str | None,
"kodPojistovny": str | None,
"nazevPojistovny": str | None,
"pojisteniKod": str | None
}
"""
import xml.etree.ElementTree as ET
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B"
}
root = ET.fromstring(xml_text)
# ---- Extract status ----
stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS)
stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None
# ---- If no stavPojisteni element present (e.g. 0 or some errors) ----
node_stav = root.find(".//vzp:stavPojisteni", NS)
if node_stav is None:
return {
"stavVyrizeni": stav_vyr,
"stav": None,
"kodPojistovny": None,
"nazevPojistovny": None,
"pojisteniKod": None,
}
def get(tag):
el = node_stav.find(f"vzp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
return {
"stavVyrizeni": stav_vyr,
"stav": get("stav"),
"kodPojistovny": get("kodPojistovny"),
"nazevPojistovny": get("nazevPojistovny"),
"pojisteniKod": get("pojisteniKod"),
}