notebookVB

This commit is contained in:
2026-01-27 06:37:56 +01:00
parent a3c26f8bf7
commit 31580f89a4

View File

@@ -1,232 +1,73 @@
#!/usr/bin/env python3 import fdb
# -*- coding: utf-8 -*-
"""
VZP insurance history checker (single RC)
-----------------------------------------
- asks for RC
- determines current insurance
- searches backwards to find last change date
- strict rate limit: 1 request / second
- no MedicusDB dependency
"""
import sys class MedicusDB:
import time
import logging
from pathlib import Path
from datetime import date, timedelta
import pymysql def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"):
self.conn = fdb.connect(
host=host,
database=db_path,
user=user,
password=password,
charset=charset
)
self.cur = self.conn.cursor()
# ========================================== def get_fak_kapitace(self, as_dict=False):
# PROJECT ROOT (import fix) sql = """
# ========================================== SELECT
PROJECT_ROOT = Path(__file__).resolve().parent.parent fak.id,
sys.path.insert(0, str(PROJECT_ROOT)) fak.cisfak,
fak.poj,
from knihovny.vzpb2b_client import VZPB2BClient fak.kapdetail
FROM fak
# ========================================== WHERE fak.kapdetail IS NOT NULL
# LOGGING AND fak.kapdetail <> ''
# ==========================================
logging.basicConfig(
filename="insurance_history.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8"
)
console = logging.getLogger("console")
console.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
console.addHandler(handler)
def log(msg):
logging.info(msg)
console.info(msg)
def log_err(msg):
logging.error(msg)
console.error(msg)
# ==========================================
# MYSQL CONNECTION
# ==========================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
autocommit=True
)
def save_result(rc, prijmeni, jmeno, k_datu, result, xml):
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, prijmeni, jmeno, k_datu,
stav, kod_pojistovny, nazev_pojistovny,
pojisteni_kod, stav_vyrizeni, response_xml)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
with mysql.cursor() as cur:
cur.execute(sql, (
rc,
prijmeni,
jmeno,
k_datu,
result["stav"],
result["kodPojistovny"],
result["nazevPojistovny"],
result["pojisteniKod"],
result["stavVyrizeni"],
xml
))
# ==========================================
# VZP RATE LIMITER (1 REQUEST / SECOND)
# ==========================================
_LAST_VZP_CALL = 0.0
def vzp_call(func, *args, **kwargs):
global _LAST_VZP_CALL
now = time.time()
elapsed = now - _LAST_VZP_CALL
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
result = func(*args, **kwargs)
_LAST_VZP_CALL = time.time()
return result
# ==========================================
# CONFIGURATION
# ==========================================
PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx"
PFX_PASSWORD = "Vlado7309208104+"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
if not PFX_PATH.exists():
raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}")
# ==========================================
# INIT VZP CLIENT
# ==========================================
vzp = VZPB2BClient(
ENV,
str(PFX_PATH),
PFX_PASSWORD,
icz=ICZ,
dic=DIC
)
# ==========================================
# INPUT
# ==========================================
rc = input("Zadej rodné číslo (bez /): ").strip()
# try to load name from DB (optional)
with mysql.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute(
""" """
SELECT prijmeni, jmeno if as_dict:
FROM vzp_stav_pojisteni return self.query_dict(sql)
WHERE rc = %s return self.query(sql)
ORDER BY k_datu DESC
LIMIT 1
""",
(rc,)
)
row = cur.fetchone()
if row: def query(self, sql, params=None):
prijmeni = row["prijmeni"] self.cur.execute(sql, params or ())
jmeno = row["jmeno"] return self.cur.fetchall()
else:
prijmeni = "?"
jmeno = "?"
today = date.today() def query_dict(self, sql, params=None):
log(f"▶ Kontrola pojištění: {prijmeni} {jmeno} ({rc})") self.cur.execute(sql, params or ())
cols = [d[0].strip().lower() for d in self.cur.description]
return [dict(zip(cols, row)) for row in self.cur.fetchall()]
# ========================================== def get_active_registered_patients(self, as_dict=False):
# CURRENT STATUS sql = """
# ========================================== SELECT
xml = vzp_call( kar.rodcis,
vzp.stav_pojisteni, kar.prijmeni,
rc=rc, kar.jmeno,
k_datu=today.isoformat() kar.poj
) FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
AND kar.rodcis IS NOT NULL
AND kar.rodcis <> ''
AND kar.vyrazen <> 'A'
"""
if as_dict:
return self.query_dict(sql)
return self.query(sql)
current = vzp.parse_stav_pojisteni(xml) def get_all_patients(self, as_dict=False):
save_result(rc, prijmeni, jmeno, today, current, xml) sql = """
SELECT
kar.rodcis,
kar.prijmeni,
kar.jmeno,
kar.poj
FROM kar
"""
if as_dict:
return self.query_dict(sql)
return self.query(sql)
current_code = current["kodPojistovny"] def close(self):
log(f"✔ Aktuální pojišťovna: {current['nazevPojistovny']} ({current_code})") self.conn.close()
# ==========================================
# BACKWARD SEARCH
# ==========================================
def check_at(d):
xml = vzp_call(
vzp.stav_pojisteni,
rc=rc,
k_datu=d.isoformat()
)
res = vzp.parse_stav_pojisteni(xml)
kod = res["kodPojistovny"]
# 👇 THIS IS THE VISIBILITY YOU WANT
log(f" test {d.isoformat()} → kód {kod}")
save_result(rc, prijmeni, jmeno, d, res, xml)
return kod
log("\n⏪ Hledám poslední změnu pojišťovny...")
# 1⃣ yearly steps
probe = today
last_same = today
while True:
probe -= timedelta(days=365)
if probe.year < today.year - 20:
break
if check_at(probe) != current_code:
break
last_same = probe
# 2⃣ monthly steps
probe = last_same
while True:
probe -= timedelta(days=30)
if check_at(probe) != current_code:
break
last_same = probe
# 3⃣ daily steps
probe = last_same
while True:
probe -= timedelta(days=1)
if check_at(probe) != current_code:
change_date = probe + timedelta(days=1)
break
# ==========================================
# RESULT
# ==========================================
log("\n✅ VÝSLEDEK")
log(f"Pojišťovna {current_code} platí od: {change_date.isoformat()}")
# ==========================================
# CLEANUP
# ==========================================
mysql.close()
log("\nDONE.")