From 3fa799658022801e4ca5131bb51dacb254085935 Mon Sep 17 00:00:00 2001 From: "vladimir.buzalka" Date: Thu, 11 Dec 2025 17:41:06 +0100 Subject: [PATCH] z230 --- 10 Tests/RozdilnalezenimeziFDBaMySQL.py | 67 +++++++++++ 10 Tests/medicus_db.py | 42 +++++++ 10 Tests/rozdíl.py | 79 +++++++++++++ 10 Tests/test kontrola registrovaných.py | 137 +++++++++++++++++++++++ 10 Tests/test stavpojisteni.py | 6 + 5 files changed, 331 insertions(+) create mode 100644 10 Tests/RozdilnalezenimeziFDBaMySQL.py create mode 100644 10 Tests/medicus_db.py create mode 100644 10 Tests/rozdíl.py create mode 100644 10 Tests/test kontrola registrovaných.py diff --git a/10 Tests/RozdilnalezenimeziFDBaMySQL.py b/10 Tests/RozdilnalezenimeziFDBaMySQL.py new file mode 100644 index 0000000..018df59 --- /dev/null +++ b/10 Tests/RozdilnalezenimeziFDBaMySQL.py @@ -0,0 +1,67 @@ +import pandas as pd +import fdb +import pymysql + +# --------------------------------- +# FIREBIRD CONNECTION +# --------------------------------- +fb = fdb.connect( + host="192.168.1.4", + database=r"z:\Medicus 3\data\MEDICUS.FDB", + user="SYSDBA", + password="masterkey", + charset="WIN1250" +) +cur = fb.cursor() + +sql_fb = """ +SELECT kar.rodcis +FROM registr +JOIN kar ON registr.idpac = kar.idpac +WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') +""" + +cur.execute(sql_fb) +rows_fb = cur.fetchall() + +df_fb = pd.DataFrame(rows_fb, columns=["rc"]) +print("FB count:", len(df_fb)) + +# --------------------------------- +# MYSQL CONNECTION +# --------------------------------- +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4" +) + +sql_mysql = """ +SELECT rc +FROM vzp_stav_pojisteni AS v +WHERE v.k_datu = CURDATE() + AND v.id = ( + SELECT MAX(id) + FROM vzp_stav_pojisteni + WHERE rc = v.rc + AND k_datu = CURDATE() + ); +""" + +df_mysql = pd.read_sql(sql_mysql, mysql) +print("MySQL count:", len(df_mysql)) + +# --------------------------------- +# FIND MISSING RC +# --------------------------------- +df_missing = df_fb[~df_fb["rc"].isin(df_mysql["rc"])] + +print("\nMissing patients:") +print(df_missing) + +fb.close() +mysql.close() diff --git a/10 Tests/medicus_db.py b/10 Tests/medicus_db.py new file mode 100644 index 0000000..b8de965 --- /dev/null +++ b/10 Tests/medicus_db.py @@ -0,0 +1,42 @@ +import fdb + + +class MedicusDB: + + def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): + self.conn = fdb.connect( + host=host, + database=db_path, + user=user, + password=password, + charset=charset + ) + self.cur = self.conn.cursor() + + def query(self, sql, params=None): + self.cur.execute(sql, params or ()) + return self.cur.fetchall() + + def query_dict(self, sql, params=None): + self.cur.execute(sql, params or ()) + cols = [d[0].strip().lower() for d in self.cur.description] + return [dict(zip(cols, row)) for row in self.cur.fetchall()] + + def get_active_registered_patients(self): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM registr + JOIN kar ON registr.idpac = kar.idpac + WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') + AND kar.rodcis IS NOT NULL + AND kar.rodcis <> '' + """ + return self.query(sql) # or self.query_dict(sql) + + def close(self): + self.conn.close() diff --git a/10 Tests/rozdíl.py b/10 Tests/rozdíl.py new file mode 100644 index 0000000..0db0af4 --- /dev/null +++ b/10 Tests/rozdíl.py @@ -0,0 +1,79 @@ +import pandas as pd +import pymysql +from medicus_db import MedicusDB +import fdb + + +# FULL OUTPUT SETTINGS +pd.set_option("display.max_rows", None) +pd.set_option("display.max_columns", None) +pd.set_option("display.width", 0) +pd.set_option("display.max_colwidth", None) + + +# =========== +# +# =========================== +# FIREBIRD → načtení registrovaných pacientů +# ====================================== +db = MedicusDB("192.168.1.4", r"z:\Medicus 3\data\MEDICUS.FDB") +rows_fb = db.get_active_registered_patients() # vrací rc, prijmeni, jmeno, poj +db.close() + +df_fb = pd.DataFrame(rows_fb, columns=["rc", "prijmeni", "jmeno", "poj_medicus"]) +df_fb["poj_medicus"] = df_fb["poj_medicus"].astype(str).str.strip() + +print("FB count:", len(df_fb)) + + +# ====================================== +# MYSQL → načtení dnešních výsledků +# ====================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4" +) + +sql_mysql = """ +SELECT rc, + kod_pojistovny AS poj_mysql, + nazev_pojistovny, + stav, + stav_vyrizeni +FROM vzp_stav_pojisteni AS v +WHERE v.k_datu = CURDATE() + AND v.id = ( + SELECT MAX(id) + FROM vzp_stav_pojisteni + WHERE rc = v.rc + AND k_datu = CURDATE() + ); +""" + +df_mysql = pd.read_sql(sql_mysql, mysql) +df_mysql["poj_mysql"] = df_mysql["poj_mysql"].astype(str).str.strip() + +print("MySQL count:", len(df_mysql)) + + +# ====================================== +# LEFT JOIN: Medicus ↔ MySQL podle RC +# ====================================== +df_merge = df_fb.merge(df_mysql, on="rc", how="left") + + +# ====================================== +# Najít rozdíly pojišťovny +# ====================================== +df_diff = df_merge[df_merge["poj_medicus"] != df_merge["poj_mysql"]] + +print("\nPacienti s rozdílnou pojišťovnou:") +print(df_diff[["rc", "prijmeni", "jmeno", "poj_medicus", "poj_mysql", "nazev_pojistovny"]]) + +# Pokud chceš uložit do Excelu: +# df_diff.to_excel("rozdil_pojistoven.xlsx", index=False) + diff --git a/10 Tests/test kontrola registrovaných.py b/10 Tests/test kontrola registrovaných.py new file mode 100644 index 0000000..ff7382e --- /dev/null +++ b/10 Tests/test kontrola registrovaných.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import logging +from medicus_db import MedicusDB +from vzpb2b_client import VZPB2BClient +import pymysql +from datetime import date + +# ========================================== +# LOGGING SETUP +# ========================================== +logging.basicConfig( + filename="insurance_check.log", + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + encoding="utf-8" +) + +console = logging.getLogger("console") +console.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(message)s")) +console.addHandler(handler) + +def log_info(msg): + logging.info(msg) + console.info(msg) + +def log_error(msg): + logging.error(msg) + console.error(msg) + +# ========================================== +# MYSQL CONNECTION +# ========================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4", + autocommit=True +) + +def save_insurance_status(mysql_conn, rc, k_datu, result, xml_text): + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, + pojisteni_kod, stav_vyrizeni, response_xml) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + try: + with mysql_conn.cursor() as cur: + cur.execute(sql, ( + rc, + k_datu, + result["stav"], + result["kodPojistovny"], + result["nazevPojistovny"], + result["pojisteniKod"], + result["stavVyrizeni"], + xml_text + )) + except Exception as e: + log_error(f"❌ MYSQL ERROR for RC {rc}: {e}") + log_error(f"---- RAW XML ----\n{xml_text}\n-----------------") + raise + + +# ========================================== +# CONFIGURATION +# ========================================== +HOST = "192.168.1.4" +DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB" + +PFX_PATH = r"MBcert.pfx" +PFX_PASSWORD = "Vlado7309208104++" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +# ========================================== +# INIT CONNECTIONS +# ========================================== +db = MedicusDB(HOST, DB_PATH) +vzp = VZPB2BClient(ENV, PFX_PATH, PFX_PASSWORD, icz=ICZ, dic=DIC) + +# ========================================== +# FETCH REGISTERED PATIENTS +# ========================================== +patients = db.get_active_registered_patients() +log_info(f"Checking {len(patients)} registered patients...\n") + +k_datu = date.today().isoformat() + +# ========================================== +# LOOP ONE PATIENT PER SECOND +# ========================================== +for rodcis, prijmeni, jmeno, poj in patients: + + log_info(f"=== Checking {prijmeni} {jmeno} ({rodcis}) ===") + + xml = vzp.stav_pojisteni(rc=rodcis, k_datu=k_datu) + + # 1) Check if response looks like XML + if not xml.strip().startswith("<"): + log_error(f"❌ INVALID XML for RC {rodcis}") + log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") + time.sleep(2) + continue + + # 2) Try parsing XML + try: + result = vzp.parse_stav_pojisteni(xml) + except Exception as e: + log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}") + log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") + time.sleep(2) + continue + + log_info(f"Result: {result}") + + # 3) Save into MySQL (with logging) + try: + save_insurance_status(mysql, rodcis, k_datu, result, xml) + except Exception: + log_error(f"❌ FAILURE inserting to MySQL for {rodcis}") + continue + + time.sleep(2) + +db.close() +log_info("\nDONE.") diff --git a/10 Tests/test stavpojisteni.py b/10 Tests/test stavpojisteni.py index 309b7cb..9d3a323 100644 --- a/10 Tests/test stavpojisteni.py +++ b/10 Tests/test stavpojisteni.py @@ -1,3 +1,9 @@ +cur.execute("select rodcis,prijmeni,jmeno from kar where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc") + + + + + from vzpb2b_client import VZPB2BClient client = VZPB2BClient(