diff --git a/knihovny/medicus_db.py b/knihovny/medicus_db.py index 0a32896..8674e6e 100644 --- a/knihovny/medicus_db.py +++ b/knihovny/medicus_db.py @@ -1,232 +1,73 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- +import fdb -""" -VZP insurance history checker (single RC) ------------------------------------------ -- asks for RC -- determines current insurance -- searches backwards to find last change date -- strict rate limit: 1 request / second -- no MedicusDB dependency -""" -import sys -import time -import logging -from pathlib import Path -from datetime import date, timedelta +class MedicusDB: -import pymysql + def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): + self.conn = fdb.connect( + host=host, + database=db_path, + user=user, + password=password, + charset=charset + ) + self.cur = self.conn.cursor() -# ========================================== -# PROJECT ROOT (import fix) -# ========================================== -PROJECT_ROOT = Path(__file__).resolve().parent.parent -sys.path.insert(0, str(PROJECT_ROOT)) - -from knihovny.vzpb2b_client import VZPB2BClient - -# ========================================== -# LOGGING -# ========================================== -logging.basicConfig( - filename="insurance_history.log", - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - encoding="utf-8" -) - -console = logging.getLogger("console") -console.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter("%(message)s")) -console.addHandler(handler) - -def log(msg): - logging.info(msg) - console.info(msg) - -def log_err(msg): - logging.error(msg) - console.error(msg) - -# ========================================== -# MYSQL CONNECTION -# ========================================== -mysql = pymysql.connect( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - charset="utf8mb4", - autocommit=True -) - -def save_result(rc, prijmeni, jmeno, k_datu, result, xml): - sql = """ - INSERT INTO vzp_stav_pojisteni - (rc, prijmeni, jmeno, k_datu, - stav, kod_pojistovny, nazev_pojistovny, - pojisteni_kod, stav_vyrizeni, response_xml) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) - """ - with mysql.cursor() as cur: - cur.execute(sql, ( - rc, - prijmeni, - jmeno, - k_datu, - result["stav"], - result["kodPojistovny"], - result["nazevPojistovny"], - result["pojisteniKod"], - result["stavVyrizeni"], - xml - )) - -# ========================================== -# VZP RATE LIMITER (1 REQUEST / SECOND) -# ========================================== -_LAST_VZP_CALL = 0.0 - -def vzp_call(func, *args, **kwargs): - global _LAST_VZP_CALL - now = time.time() - elapsed = now - _LAST_VZP_CALL - if elapsed < 1.0: - time.sleep(1.0 - elapsed) - result = func(*args, **kwargs) - _LAST_VZP_CALL = time.time() - return result - -# ========================================== -# CONFIGURATION -# ========================================== -PFX_PATH = PROJECT_ROOT / "certificates" / "picka.pfx" -PFX_PASSWORD = "Vlado7309208104+" - -ENV = "prod" -ICZ = "00000000" -DIC = "00000000" - -if not PFX_PATH.exists(): - raise FileNotFoundError(f"PFX certificate not found: {PFX_PATH}") - -# ========================================== -# INIT VZP CLIENT -# ========================================== -vzp = VZPB2BClient( - ENV, - str(PFX_PATH), - PFX_PASSWORD, - icz=ICZ, - dic=DIC -) - -# ========================================== -# INPUT -# ========================================== -rc = input("Zadej rodné číslo (bez /): ").strip() - -# try to load name from DB (optional) -with mysql.cursor(pymysql.cursors.DictCursor) as cur: - cur.execute( + def get_fak_kapitace(self, as_dict=False): + sql = """ + SELECT + fak.id, + fak.cisfak, + fak.poj, + fak.kapdetail + FROM fak + WHERE fak.kapdetail IS NOT NULL + AND fak.kapdetail <> '' """ - SELECT prijmeni, jmeno - FROM vzp_stav_pojisteni - WHERE rc = %s - ORDER BY k_datu DESC - LIMIT 1 - """, - (rc,) - ) - row = cur.fetchone() + if as_dict: + return self.query_dict(sql) + return self.query(sql) -if row: - prijmeni = row["prijmeni"] - jmeno = row["jmeno"] -else: - prijmeni = "?" - jmeno = "?" + def query(self, sql, params=None): + self.cur.execute(sql, params or ()) + return self.cur.fetchall() -today = date.today() -log(f"▶ Kontrola pojištění: {prijmeni} {jmeno} ({rc})") + def query_dict(self, sql, params=None): + self.cur.execute(sql, params or ()) + cols = [d[0].strip().lower() for d in self.cur.description] + return [dict(zip(cols, row)) for row in self.cur.fetchall()] -# ========================================== -# CURRENT STATUS -# ========================================== -xml = vzp_call( - vzp.stav_pojisteni, - rc=rc, - k_datu=today.isoformat() -) + def get_active_registered_patients(self, as_dict=False): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM registr + JOIN kar ON registr.idpac = kar.idpac + WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') + AND kar.rodcis IS NOT NULL + AND kar.rodcis <> '' + AND kar.vyrazen <> 'A' + """ + if as_dict: + return self.query_dict(sql) + return self.query(sql) -current = vzp.parse_stav_pojisteni(xml) -save_result(rc, prijmeni, jmeno, today, current, xml) + def get_all_patients(self, as_dict=False): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM kar + """ + if as_dict: + return self.query_dict(sql) + return self.query(sql) -current_code = current["kodPojistovny"] -log(f"✔ Aktuální pojišťovna: {current['nazevPojistovny']} ({current_code})") - -# ========================================== -# BACKWARD SEARCH -# ========================================== -def check_at(d): - xml = vzp_call( - vzp.stav_pojisteni, - rc=rc, - k_datu=d.isoformat() - ) - res = vzp.parse_stav_pojisteni(xml) - - kod = res["kodPojistovny"] - - # 👇 THIS IS THE VISIBILITY YOU WANT - log(f" test {d.isoformat()} → kód {kod}") - - save_result(rc, prijmeni, jmeno, d, res, xml) - return kod - - -log("\n⏪ Hledám poslední změnu pojišťovny...") - -# 1️⃣ yearly steps -probe = today -last_same = today - -while True: - probe -= timedelta(days=365) - if probe.year < today.year - 20: - break - if check_at(probe) != current_code: - break - last_same = probe - -# 2️⃣ monthly steps -probe = last_same -while True: - probe -= timedelta(days=30) - if check_at(probe) != current_code: - break - last_same = probe - -# 3️⃣ daily steps -probe = last_same -while True: - probe -= timedelta(days=1) - if check_at(probe) != current_code: - change_date = probe + timedelta(days=1) - break - -# ========================================== -# RESULT -# ========================================== -log("\n✅ VÝSLEDEK") -log(f"Pojišťovna {current_code} platí od: {change_date.isoformat()}") - -# ========================================== -# CLEANUP -# ========================================== -mysql.close() -log("\nDONE.") + def close(self): + self.conn.close()