notebookVB
This commit is contained in:
234
00 Funkční testy/20 Kdy se stav změnil.py
Normal file
234
00 Funkční testy/20 Kdy se stav změnil.py
Normal file
@@ -0,0 +1,234 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""
|
||||||
|
VZP – zjištění od kdy platí AKTUÁLNÍ STAV pojištění (stav)
|
||||||
|
---------------------------------------------------------
|
||||||
|
- vstup: rodné číslo
|
||||||
|
- porovnává pouze položku: res["stav"]
|
||||||
|
- hledá změnu: roky → měsíce → týdny → dny
|
||||||
|
- 1 request / sekunda (tvrdý limit)
|
||||||
|
- ukládá vše do MySQL (audit)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import date, timedelta
|
||||||
|
|
||||||
|
import pymysql
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# PROJECT ROOT
|
||||||
|
# ==========================================
|
||||||
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||||||
|
sys.path.insert(0, str(PROJECT_ROOT))
|
||||||
|
|
||||||
|
from knihovny.vzpb2b_client import VZPB2BClient
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# LOGGING
|
||||||
|
# ==========================================
|
||||||
|
logging.basicConfig(
|
||||||
|
filename="insurance_status_history.log",
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||||
|
encoding="utf-8"
|
||||||
|
)
|
||||||
|
|
||||||
|
console = logging.getLogger("console")
|
||||||
|
console.setLevel(logging.INFO)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setFormatter(logging.Formatter("%(message)s"))
|
||||||
|
console.addHandler(handler)
|
||||||
|
|
||||||
|
def log(msg):
|
||||||
|
logging.info(msg)
|
||||||
|
console.info(msg)
|
||||||
|
|
||||||
|
def log_err(msg):
|
||||||
|
logging.error(msg)
|
||||||
|
console.error(msg)
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# MYSQL
|
||||||
|
# ==========================================
|
||||||
|
mysql = pymysql.connect(
|
||||||
|
host="192.168.1.76",
|
||||||
|
port=3307,
|
||||||
|
user="root",
|
||||||
|
password="Vlado9674+",
|
||||||
|
database="medevio",
|
||||||
|
charset="utf8mb4",
|
||||||
|
autocommit=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def save_result(rc, prijmeni, jmeno, k_datu, res, xml):
|
||||||
|
sql = """
|
||||||
|
INSERT INTO vzp_stav_pojisteni
|
||||||
|
(rc, prijmeni, jmeno, k_datu,
|
||||||
|
stav, kod_pojistovny, nazev_pojistovny,
|
||||||
|
pojisteni_kod, stav_vyrizeni, response_xml)
|
||||||
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||||||
|
"""
|
||||||
|
with mysql.cursor() as cur:
|
||||||
|
cur.execute(sql, (
|
||||||
|
rc,
|
||||||
|
prijmeni,
|
||||||
|
jmeno,
|
||||||
|
k_datu,
|
||||||
|
res["stav"],
|
||||||
|
res["kodPojistovny"],
|
||||||
|
res["nazevPojistovny"],
|
||||||
|
res["pojisteniKod"],
|
||||||
|
res["stavVyrizeni"],
|
||||||
|
xml
|
||||||
|
))
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# RATE LIMITER – 1 req / sec
|
||||||
|
# ==========================================
|
||||||
|
_LAST_CALL = 0.0
|
||||||
|
|
||||||
|
def vzp_call(func, *args, **kwargs):
|
||||||
|
global _LAST_CALL
|
||||||
|
now = time.time()
|
||||||
|
delta = now - _LAST_CALL
|
||||||
|
if delta < 1.1:
|
||||||
|
time.sleep(1.1 - delta)
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
_LAST_CALL = time.time()
|
||||||
|
return result
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# CONFIG
|
||||||
|
# ==========================================
|
||||||
|
PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx"
|
||||||
|
PFX_PASSWORD = "Vlado7309208104+"
|
||||||
|
|
||||||
|
ENV = "prod"
|
||||||
|
ICZ = "00000000"
|
||||||
|
DIC = "00000000"
|
||||||
|
|
||||||
|
if not PFX_PATH.exists():
|
||||||
|
raise FileNotFoundError(f"PFX not found: {PFX_PATH}")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# INIT VZP CLIENT
|
||||||
|
# ==========================================
|
||||||
|
vzp = VZPB2BClient(
|
||||||
|
ENV,
|
||||||
|
str(PFX_PATH),
|
||||||
|
PFX_PASSWORD,
|
||||||
|
icz=ICZ,
|
||||||
|
dic=DIC
|
||||||
|
)
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# INPUT
|
||||||
|
# ==========================================
|
||||||
|
rc = input("Zadej rodné číslo (bez /): ").strip()
|
||||||
|
|
||||||
|
# jméno jen informativně (pokud existuje historie)
|
||||||
|
with mysql.cursor(pymysql.cursors.DictCursor) as cur:
|
||||||
|
cur.execute("""
|
||||||
|
SELECT prijmeni, jmeno
|
||||||
|
FROM vzp_stav_pojisteni
|
||||||
|
WHERE rc = %s
|
||||||
|
ORDER BY k_datu DESC
|
||||||
|
LIMIT 1
|
||||||
|
""", (rc,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
|
||||||
|
prijmeni = row["prijmeni"] if row else "?"
|
||||||
|
jmeno = row["jmeno"] if row else "?"
|
||||||
|
|
||||||
|
today = date.today()
|
||||||
|
log(f"▶ Kontrola STAVU pojištění: {prijmeni} {jmeno} ({rc})")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# CHECK FUNCTION (STAV)
|
||||||
|
# ==========================================
|
||||||
|
def check_at(d: date):
|
||||||
|
xml = vzp_call(
|
||||||
|
vzp.stav_pojisteni,
|
||||||
|
rc=rc,
|
||||||
|
k_datu=d.isoformat()
|
||||||
|
)
|
||||||
|
res = vzp.parse_stav_pojisteni(xml)
|
||||||
|
|
||||||
|
stav = res["stav"]
|
||||||
|
log(f" test {d.isoformat()} → stav={stav}")
|
||||||
|
|
||||||
|
save_result(rc, prijmeni, jmeno, d, res, xml)
|
||||||
|
return stav
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# CURRENT STAV
|
||||||
|
# ==========================================
|
||||||
|
today_stav = check_at(today)
|
||||||
|
log(f"✔ Aktuální stav: {today_stav}")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# 1️⃣ BACKWARD – YEARS
|
||||||
|
# ==========================================
|
||||||
|
probe = today
|
||||||
|
last_same = today
|
||||||
|
|
||||||
|
while True:
|
||||||
|
probe -= timedelta(days=365)
|
||||||
|
if probe.year < today.year - 20:
|
||||||
|
break
|
||||||
|
|
||||||
|
if check_at(probe) != today_stav:
|
||||||
|
break
|
||||||
|
|
||||||
|
last_same = probe
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# 2️⃣ FORWARD – MONTHS
|
||||||
|
# ==========================================
|
||||||
|
probe = probe
|
||||||
|
while True:
|
||||||
|
probe += timedelta(days=30)
|
||||||
|
if probe >= last_same:
|
||||||
|
break
|
||||||
|
|
||||||
|
if check_at(probe) == today_stav:
|
||||||
|
upper = probe
|
||||||
|
lower = probe - timedelta(days=30)
|
||||||
|
break
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# 3️⃣ FORWARD – WEEKS
|
||||||
|
# ==========================================
|
||||||
|
probe = lower
|
||||||
|
while True:
|
||||||
|
probe += timedelta(days=7)
|
||||||
|
if probe >= upper:
|
||||||
|
break
|
||||||
|
|
||||||
|
if check_at(probe) == today_stav:
|
||||||
|
upper = probe
|
||||||
|
lower = probe - timedelta(days=7)
|
||||||
|
break
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# 4️⃣ FORWARD – DAYS
|
||||||
|
# ==========================================
|
||||||
|
probe = lower
|
||||||
|
while True:
|
||||||
|
probe += timedelta(days=1)
|
||||||
|
if check_at(probe) == today_stav:
|
||||||
|
change_date = probe
|
||||||
|
break
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# RESULT
|
||||||
|
# ==========================================
|
||||||
|
log("\n✅ VÝSLEDEK")
|
||||||
|
log(f"Stav '{today_stav}' platí od: {change_date.isoformat()}")
|
||||||
|
|
||||||
|
mysql.close()
|
||||||
|
log("\nDONE.")
|
||||||
@@ -1,73 +1,232 @@
|
|||||||
import fdb
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""
|
||||||
|
VZP insurance history checker (single RC)
|
||||||
|
-----------------------------------------
|
||||||
|
- asks for RC
|
||||||
|
- determines current insurance
|
||||||
|
- searches backwards to find last change date
|
||||||
|
- strict rate limit: 1 request / second
|
||||||
|
- no MedicusDB dependency
|
||||||
|
"""
|
||||||
|
|
||||||
class MedicusDB:
|
import sys
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import date, timedelta
|
||||||
|
|
||||||
def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"):
|
import pymysql
|
||||||
self.conn = fdb.connect(
|
|
||||||
host=host,
|
|
||||||
database=db_path,
|
|
||||||
user=user,
|
|
||||||
password=password,
|
|
||||||
charset=charset
|
|
||||||
)
|
|
||||||
self.cur = self.conn.cursor()
|
|
||||||
|
|
||||||
def get_fak_kapitace(self, as_dict=False):
|
# ==========================================
|
||||||
sql = """
|
# PROJECT ROOT (import fix)
|
||||||
SELECT
|
# ==========================================
|
||||||
fak.id,
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||||||
fak.cisfak,
|
sys.path.insert(0, str(PROJECT_ROOT))
|
||||||
fak.poj,
|
|
||||||
fak.kapdetail
|
from knihovny.vzpb2b_client import VZPB2BClient
|
||||||
FROM fak
|
|
||||||
WHERE fak.kapdetail IS NOT NULL
|
# ==========================================
|
||||||
AND fak.kapdetail <> ''
|
# LOGGING
|
||||||
|
# ==========================================
|
||||||
|
logging.basicConfig(
|
||||||
|
filename="insurance_history.log",
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||||
|
encoding="utf-8"
|
||||||
|
)
|
||||||
|
|
||||||
|
console = logging.getLogger("console")
|
||||||
|
console.setLevel(logging.INFO)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setFormatter(logging.Formatter("%(message)s"))
|
||||||
|
console.addHandler(handler)
|
||||||
|
|
||||||
|
def log(msg):
|
||||||
|
logging.info(msg)
|
||||||
|
console.info(msg)
|
||||||
|
|
||||||
|
def log_err(msg):
|
||||||
|
logging.error(msg)
|
||||||
|
console.error(msg)
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# MYSQL CONNECTION
|
||||||
|
# ==========================================
|
||||||
|
mysql = pymysql.connect(
|
||||||
|
host="192.168.1.76",
|
||||||
|
port=3307,
|
||||||
|
user="root",
|
||||||
|
password="Vlado9674+",
|
||||||
|
database="medevio",
|
||||||
|
charset="utf8mb4",
|
||||||
|
autocommit=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def save_result(rc, prijmeni, jmeno, k_datu, result, xml):
|
||||||
|
sql = """
|
||||||
|
INSERT INTO vzp_stav_pojisteni
|
||||||
|
(rc, prijmeni, jmeno, k_datu,
|
||||||
|
stav, kod_pojistovny, nazev_pojistovny,
|
||||||
|
pojisteni_kod, stav_vyrizeni, response_xml)
|
||||||
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||||||
|
"""
|
||||||
|
with mysql.cursor() as cur:
|
||||||
|
cur.execute(sql, (
|
||||||
|
rc,
|
||||||
|
prijmeni,
|
||||||
|
jmeno,
|
||||||
|
k_datu,
|
||||||
|
result["stav"],
|
||||||
|
result["kodPojistovny"],
|
||||||
|
result["nazevPojistovny"],
|
||||||
|
result["pojisteniKod"],
|
||||||
|
result["stavVyrizeni"],
|
||||||
|
xml
|
||||||
|
))
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# VZP RATE LIMITER (1 REQUEST / SECOND)
|
||||||
|
# ==========================================
|
||||||
|
_LAST_VZP_CALL = 0.0
|
||||||
|
|
||||||
|
def vzp_call(func, *args, **kwargs):
|
||||||
|
global _LAST_VZP_CALL
|
||||||
|
now = time.time()
|
||||||
|
elapsed = now - _LAST_VZP_CALL
|
||||||
|
if elapsed < 1.0:
|
||||||
|
time.sleep(1.0 - elapsed)
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
_LAST_VZP_CALL = time.time()
|
||||||
|
return result
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# CONFIGURATION
|
||||||
|
# ==========================================
|
||||||
|
PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx"
|
||||||
|
PFX_PASSWORD = "Vlado7309208104+"
|
||||||
|
|
||||||
|
ENV = "prod"
|
||||||
|
ICZ = "00000000"
|
||||||
|
DIC = "00000000"
|
||||||
|
|
||||||
|
if not PFX_PATH.exists():
|
||||||
|
raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# INIT VZP CLIENT
|
||||||
|
# ==========================================
|
||||||
|
vzp = VZPB2BClient(
|
||||||
|
ENV,
|
||||||
|
str(PFX_PATH),
|
||||||
|
PFX_PASSWORD,
|
||||||
|
icz=ICZ,
|
||||||
|
dic=DIC
|
||||||
|
)
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# INPUT
|
||||||
|
# ==========================================
|
||||||
|
rc = input("Zadej rodné číslo (bez /): ").strip()
|
||||||
|
|
||||||
|
# try to load name from DB (optional)
|
||||||
|
with mysql.cursor(pymysql.cursors.DictCursor) as cur:
|
||||||
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
if as_dict:
|
SELECT prijmeni, jmeno
|
||||||
return self.query_dict(sql)
|
FROM vzp_stav_pojisteni
|
||||||
return self.query(sql)
|
WHERE rc = %s
|
||||||
|
ORDER BY k_datu DESC
|
||||||
|
LIMIT 1
|
||||||
|
""",
|
||||||
|
(rc,)
|
||||||
|
)
|
||||||
|
row = cur.fetchone()
|
||||||
|
|
||||||
def query(self, sql, params=None):
|
if row:
|
||||||
self.cur.execute(sql, params or ())
|
prijmeni = row["prijmeni"]
|
||||||
return self.cur.fetchall()
|
jmeno = row["jmeno"]
|
||||||
|
else:
|
||||||
|
prijmeni = "?"
|
||||||
|
jmeno = "?"
|
||||||
|
|
||||||
def query_dict(self, sql, params=None):
|
today = date.today()
|
||||||
self.cur.execute(sql, params or ())
|
log(f"▶ Kontrola pojištění: {prijmeni} {jmeno} ({rc})")
|
||||||
cols = [d[0].strip().lower() for d in self.cur.description]
|
|
||||||
return [dict(zip(cols, row)) for row in self.cur.fetchall()]
|
|
||||||
|
|
||||||
def get_active_registered_patients(self, as_dict=False):
|
# ==========================================
|
||||||
sql = """
|
# CURRENT STATUS
|
||||||
SELECT
|
# ==========================================
|
||||||
kar.rodcis,
|
xml = vzp_call(
|
||||||
kar.prijmeni,
|
vzp.stav_pojisteni,
|
||||||
kar.jmeno,
|
rc=rc,
|
||||||
kar.poj
|
k_datu=today.isoformat()
|
||||||
FROM registr
|
)
|
||||||
JOIN kar ON registr.idpac = kar.idpac
|
|
||||||
WHERE registr.datum_zruseni IS NULL
|
|
||||||
AND registr.priznak IN ('A','D','V')
|
|
||||||
AND kar.rodcis IS NOT NULL
|
|
||||||
AND kar.rodcis <> ''
|
|
||||||
AND kar.vyrazen <> 'A'
|
|
||||||
"""
|
|
||||||
if as_dict:
|
|
||||||
return self.query_dict(sql)
|
|
||||||
return self.query(sql)
|
|
||||||
|
|
||||||
def get_all_patients(self, as_dict=False):
|
current = vzp.parse_stav_pojisteni(xml)
|
||||||
sql = """
|
save_result(rc, prijmeni, jmeno, today, current, xml)
|
||||||
SELECT
|
|
||||||
kar.rodcis,
|
|
||||||
kar.prijmeni,
|
|
||||||
kar.jmeno,
|
|
||||||
kar.poj
|
|
||||||
FROM kar
|
|
||||||
"""
|
|
||||||
if as_dict:
|
|
||||||
return self.query_dict(sql)
|
|
||||||
return self.query(sql)
|
|
||||||
|
|
||||||
def close(self):
|
current_code = current["kodPojistovny"]
|
||||||
self.conn.close()
|
log(f"✔ Aktuální pojišťovna: {current['nazevPojistovny']} ({current_code})")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# BACKWARD SEARCH
|
||||||
|
# ==========================================
|
||||||
|
def check_at(d):
|
||||||
|
xml = vzp_call(
|
||||||
|
vzp.stav_pojisteni,
|
||||||
|
rc=rc,
|
||||||
|
k_datu=d.isoformat()
|
||||||
|
)
|
||||||
|
res = vzp.parse_stav_pojisteni(xml)
|
||||||
|
|
||||||
|
kod = res["kodPojistovny"]
|
||||||
|
|
||||||
|
# 👇 THIS IS THE VISIBILITY YOU WANT
|
||||||
|
log(f" test {d.isoformat()} → kód {kod}")
|
||||||
|
|
||||||
|
save_result(rc, prijmeni, jmeno, d, res, xml)
|
||||||
|
return kod
|
||||||
|
|
||||||
|
|
||||||
|
log("\n⏪ Hledám poslední změnu pojišťovny...")
|
||||||
|
|
||||||
|
# 1️⃣ yearly steps
|
||||||
|
probe = today
|
||||||
|
last_same = today
|
||||||
|
|
||||||
|
while True:
|
||||||
|
probe -= timedelta(days=365)
|
||||||
|
if probe.year < today.year - 20:
|
||||||
|
break
|
||||||
|
if check_at(probe) != current_code:
|
||||||
|
break
|
||||||
|
last_same = probe
|
||||||
|
|
||||||
|
# 2️⃣ monthly steps
|
||||||
|
probe = last_same
|
||||||
|
while True:
|
||||||
|
probe -= timedelta(days=30)
|
||||||
|
if check_at(probe) != current_code:
|
||||||
|
break
|
||||||
|
last_same = probe
|
||||||
|
|
||||||
|
# 3️⃣ daily steps
|
||||||
|
probe = last_same
|
||||||
|
while True:
|
||||||
|
probe -= timedelta(days=1)
|
||||||
|
if check_at(probe) != current_code:
|
||||||
|
change_date = probe + timedelta(days=1)
|
||||||
|
break
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# RESULT
|
||||||
|
# ==========================================
|
||||||
|
log("\n✅ VÝSLEDEK")
|
||||||
|
log(f"Pojišťovna {current_code} platí od: {change_date.isoformat()}")
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# CLEANUP
|
||||||
|
# ==========================================
|
||||||
|
mysql.close()
|
||||||
|
log("\nDONE.")
|
||||||
|
|||||||
Reference in New Issue
Block a user