From a3c26f8bf71a0d555cfaf00b7d312d218ae76522 Mon Sep 17 00:00:00 2001 From: Vladimir Buzalka Date: Tue, 27 Jan 2026 05:28:26 +0100 Subject: [PATCH] notebookVB --- 00 Funkční testy/20 Kdy se stav změnil.py | 234 ++++++++++++++++++ knihovny/medicus_db.py | 285 +++++++++++++++++----- 2 files changed, 456 insertions(+), 63 deletions(-) create mode 100644 00 Funkční testy/20 Kdy se stav změnil.py diff --git a/00 Funkční testy/20 Kdy se stav změnil.py b/00 Funkční testy/20 Kdy se stav změnil.py new file mode 100644 index 0000000..04ade45 --- /dev/null +++ b/00 Funkční testy/20 Kdy se stav změnil.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +VZP – zjištění od kdy platí AKTUÁLNÍ STAV pojištění (stav) +--------------------------------------------------------- +- vstup: rodné číslo +- porovnává pouze položku: res["stav"] +- hledá změnu: roky → měsíce → týdny → dny +- 1 request / sekunda (tvrdý limit) +- ukládá vše do MySQL (audit) +""" + +import sys +import time +import logging +from pathlib import Path +from datetime import date, timedelta + +import pymysql + +# ========================================== +# PROJECT ROOT +# ========================================== +PROJECT_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +from knihovny.vzpb2b_client import VZPB2BClient + +# ========================================== +# LOGGING +# ========================================== +logging.basicConfig( + filename="insurance_status_history.log", + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + encoding="utf-8" +) + +console = logging.getLogger("console") +console.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(message)s")) +console.addHandler(handler) + +def log(msg): + logging.info(msg) + console.info(msg) + +def log_err(msg): + logging.error(msg) + console.error(msg) + +# ========================================== +# MYSQL +# ========================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4", + autocommit=True +) + +def save_result(rc, prijmeni, jmeno, k_datu, res, xml): + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, prijmeni, jmeno, k_datu, + stav, kod_pojistovny, nazev_pojistovny, + pojisteni_kod, stav_vyrizeni, response_xml) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + """ + with mysql.cursor() as cur: + cur.execute(sql, ( + rc, + prijmeni, + jmeno, + k_datu, + res["stav"], + res["kodPojistovny"], + res["nazevPojistovny"], + res["pojisteniKod"], + res["stavVyrizeni"], + xml + )) + +# ========================================== +# RATE LIMITER – 1 req / sec +# ========================================== +_LAST_CALL = 0.0 + +def vzp_call(func, *args, **kwargs): + global _LAST_CALL + now = time.time() + delta = now - _LAST_CALL + if delta < 1.1: + time.sleep(1.1 - delta) + result = func(*args, **kwargs) + _LAST_CALL = time.time() + return result + +# ========================================== +# CONFIG +# ========================================== +PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx" +PFX_PASSWORD = "Vlado7309208104+" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +if not PFX_PATH.exists(): + raise FileNotFoundError(f"PFX not found: {PFX_PATH}") + +# ========================================== +# INIT VZP CLIENT +# ========================================== +vzp = VZPB2BClient( + ENV, + str(PFX_PATH), + PFX_PASSWORD, + icz=ICZ, + dic=DIC +) + +# ========================================== +# INPUT +# ========================================== +rc = input("Zadej rodné číslo (bez /): ").strip() + +# jméno jen informativně (pokud existuje historie) +with mysql.cursor(pymysql.cursors.DictCursor) as cur: + cur.execute(""" + SELECT prijmeni, jmeno + FROM vzp_stav_pojisteni + WHERE rc = %s + ORDER BY k_datu DESC + LIMIT 1 + """, (rc,)) + row = cur.fetchone() + +prijmeni = row["prijmeni"] if row else "?" +jmeno = row["jmeno"] if row else "?" + +today = date.today() +log(f"▶ Kontrola STAVU pojištění: {prijmeni} {jmeno} ({rc})") + +# ========================================== +# CHECK FUNCTION (STAV) +# ========================================== +def check_at(d: date): + xml = vzp_call( + vzp.stav_pojisteni, + rc=rc, + k_datu=d.isoformat() + ) + res = vzp.parse_stav_pojisteni(xml) + + stav = res["stav"] + log(f" test {d.isoformat()} → stav={stav}") + + save_result(rc, prijmeni, jmeno, d, res, xml) + return stav + +# ========================================== +# CURRENT STAV +# ========================================== +today_stav = check_at(today) +log(f"✔ Aktuální stav: {today_stav}") + +# ========================================== +# 1️⃣ BACKWARD – YEARS +# ========================================== +probe = today +last_same = today + +while True: + probe -= timedelta(days=365) + if probe.year < today.year - 20: + break + + if check_at(probe) != today_stav: + break + + last_same = probe + +# ========================================== +# 2️⃣ FORWARD – MONTHS +# ========================================== +probe = probe +while True: + probe += timedelta(days=30) + if probe >= last_same: + break + + if check_at(probe) == today_stav: + upper = probe + lower = probe - timedelta(days=30) + break + +# ========================================== +# 3️⃣ FORWARD – WEEKS +# ========================================== +probe = lower +while True: + probe += timedelta(days=7) + if probe >= upper: + break + + if check_at(probe) == today_stav: + upper = probe + lower = probe - timedelta(days=7) + break + +# ========================================== +# 4️⃣ FORWARD – DAYS +# ========================================== +probe = lower +while True: + probe += timedelta(days=1) + if check_at(probe) == today_stav: + change_date = probe + break + +# ========================================== +# RESULT +# ========================================== +log("\n✅ VÝSLEDEK") +log(f"Stav '{today_stav}' platí od: {change_date.isoformat()}") + +mysql.close() +log("\nDONE.") diff --git a/knihovny/medicus_db.py b/knihovny/medicus_db.py index 8674e6e..0a32896 100644 --- a/knihovny/medicus_db.py +++ b/knihovny/medicus_db.py @@ -1,73 +1,232 @@ -import fdb +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +VZP insurance history checker (single RC) +----------------------------------------- +- asks for RC +- determines current insurance +- searches backwards to find last change date +- strict rate limit: 1 request / second +- no MedicusDB dependency +""" -class MedicusDB: +import sys +import time +import logging +from pathlib import Path +from datetime import date, timedelta - def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): - self.conn = fdb.connect( - host=host, - database=db_path, - user=user, - password=password, - charset=charset - ) - self.cur = self.conn.cursor() +import pymysql - def get_fak_kapitace(self, as_dict=False): - sql = """ - SELECT - fak.id, - fak.cisfak, - fak.poj, - fak.kapdetail - FROM fak - WHERE fak.kapdetail IS NOT NULL - AND fak.kapdetail <> '' +# ========================================== +# PROJECT ROOT (import fix) +# ========================================== +PROJECT_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +from knihovny.vzpb2b_client import VZPB2BClient + +# ========================================== +# LOGGING +# ========================================== +logging.basicConfig( + filename="insurance_history.log", + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + encoding="utf-8" +) + +console = logging.getLogger("console") +console.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(message)s")) +console.addHandler(handler) + +def log(msg): + logging.info(msg) + console.info(msg) + +def log_err(msg): + logging.error(msg) + console.error(msg) + +# ========================================== +# MYSQL CONNECTION +# ========================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4", + autocommit=True +) + +def save_result(rc, prijmeni, jmeno, k_datu, result, xml): + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, prijmeni, jmeno, k_datu, + stav, kod_pojistovny, nazev_pojistovny, + pojisteni_kod, stav_vyrizeni, response_xml) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + """ + with mysql.cursor() as cur: + cur.execute(sql, ( + rc, + prijmeni, + jmeno, + k_datu, + result["stav"], + result["kodPojistovny"], + result["nazevPojistovny"], + result["pojisteniKod"], + result["stavVyrizeni"], + xml + )) + +# ========================================== +# VZP RATE LIMITER (1 REQUEST / SECOND) +# ========================================== +_LAST_VZP_CALL = 0.0 + +def vzp_call(func, *args, **kwargs): + global _LAST_VZP_CALL + now = time.time() + elapsed = now - _LAST_VZP_CALL + if elapsed < 1.0: + time.sleep(1.0 - elapsed) + result = func(*args, **kwargs) + _LAST_VZP_CALL = time.time() + return result + +# ========================================== +# CONFIGURATION +# ========================================== +PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx" +PFX_PASSWORD = "Vlado7309208104+" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +if not PFX_PATH.exists(): + raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}") + +# ========================================== +# INIT VZP CLIENT +# ========================================== +vzp = VZPB2BClient( + ENV, + str(PFX_PATH), + PFX_PASSWORD, + icz=ICZ, + dic=DIC +) + +# ========================================== +# INPUT +# ========================================== +rc = input("Zadej rodné číslo (bez /): ").strip() + +# try to load name from DB (optional) +with mysql.cursor(pymysql.cursors.DictCursor) as cur: + cur.execute( """ - if as_dict: - return self.query_dict(sql) - return self.query(sql) + SELECT prijmeni, jmeno + FROM vzp_stav_pojisteni + WHERE rc = %s + ORDER BY k_datu DESC + LIMIT 1 + """, + (rc,) + ) + row = cur.fetchone() - def query(self, sql, params=None): - self.cur.execute(sql, params or ()) - return self.cur.fetchall() +if row: + prijmeni = row["prijmeni"] + jmeno = row["jmeno"] +else: + prijmeni = "?" + jmeno = "?" - def query_dict(self, sql, params=None): - self.cur.execute(sql, params or ()) - cols = [d[0].strip().lower() for d in self.cur.description] - return [dict(zip(cols, row)) for row in self.cur.fetchall()] +today = date.today() +log(f"▶ Kontrola pojištění: {prijmeni} {jmeno} ({rc})") - def get_active_registered_patients(self, as_dict=False): - sql = """ - SELECT - kar.rodcis, - kar.prijmeni, - kar.jmeno, - kar.poj - FROM registr - JOIN kar ON registr.idpac = kar.idpac - WHERE registr.datum_zruseni IS NULL - AND registr.priznak IN ('A','D','V') - AND kar.rodcis IS NOT NULL - AND kar.rodcis <> '' - AND kar.vyrazen <> 'A' - """ - if as_dict: - return self.query_dict(sql) - return self.query(sql) +# ========================================== +# CURRENT STATUS +# ========================================== +xml = vzp_call( + vzp.stav_pojisteni, + rc=rc, + k_datu=today.isoformat() +) - def get_all_patients(self, as_dict=False): - sql = """ - SELECT - kar.rodcis, - kar.prijmeni, - kar.jmeno, - kar.poj - FROM kar - """ - if as_dict: - return self.query_dict(sql) - return self.query(sql) +current = vzp.parse_stav_pojisteni(xml) +save_result(rc, prijmeni, jmeno, today, current, xml) - def close(self): - self.conn.close() +current_code = current["kodPojistovny"] +log(f"✔ Aktuální pojišťovna: {current['nazevPojistovny']} ({current_code})") + +# ========================================== +# BACKWARD SEARCH +# ========================================== +def check_at(d): + xml = vzp_call( + vzp.stav_pojisteni, + rc=rc, + k_datu=d.isoformat() + ) + res = vzp.parse_stav_pojisteni(xml) + + kod = res["kodPojistovny"] + + # 👇 THIS IS THE VISIBILITY YOU WANT + log(f" test {d.isoformat()} → kód {kod}") + + save_result(rc, prijmeni, jmeno, d, res, xml) + return kod + + +log("\n⏪ Hledám poslední změnu pojišťovny...") + +# 1️⃣ yearly steps +probe = today +last_same = today + +while True: + probe -= timedelta(days=365) + if probe.year < today.year - 20: + break + if check_at(probe) != current_code: + break + last_same = probe + +# 2️⃣ monthly steps +probe = last_same +while True: + probe -= timedelta(days=30) + if check_at(probe) != current_code: + break + last_same = probe + +# 3️⃣ daily steps +probe = last_same +while True: + probe -= timedelta(days=1) + if check_at(probe) != current_code: + change_date = probe + timedelta(days=1) + break + +# ========================================== +# RESULT +# ========================================== +log("\n✅ VÝSLEDEK") +log(f"Pojišťovna {current_code} platí od: {change_date.isoformat()}") + +# ========================================== +# CLEANUP +# ========================================== +mysql.close() +log("\nDONE.")