notebookVB

This commit is contained in:
2026-01-17 09:59:18 +01:00
parent 5e40bc4608
commit eb448ee9cf
18 changed files with 706 additions and 46 deletions

34
10 Tests/2026-01-16 1.py Normal file
View File

@@ -0,0 +1,34 @@
import pymysql
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor
)
sql = """
SELECT rc
FROM (
SELECT rc, stav,
ROW_NUMBER() OVER (PARTITION BY rc ORDER BY k_datu DESC) AS rn
FROM vzp_stav_pojisteni
) t
WHERE rn = 1
AND stav <> '1'
"""
with mysql.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
# výsledná množina rodných čísel
rc_not_insured = {row["rc"] for row in rows}
mysql.close()
print(f"Nalezeno {len(rc_not_insured)} nepojištěných pacientů")
# print(rc_not_insured) # volitelně

211
10 Tests/2026-01-16 2.py Normal file
View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Najde přesné datum změny stavu pojištění pomocí binary search.
Používá existující VZP B2B klient + ukládá výsledky do MySQL.
Cíl:
- minimalizovat počet dotazů na VZP
- přesně určit první den, kdy stav != '1'
"""
import sys
import time
import logging
from pathlib import Path
from datetime import date, timedelta
import pymysql
# ==========================================
# PROJECT ROOT (import fix)
# ==========================================
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from knihovny.medicus_db import MedicusDB
from knihovny.vzpb2b_client import VZPB2BClient
# ==========================================
# LOGGING
# ==========================================
logging.basicConfig(
filename="insurance_binary_search.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8"
)
console = logging.getLogger("console")
console.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
console.addHandler(handler)
def log(msg):
logging.info(msg)
console.info(msg)
# ==========================================
# MYSQL
# ==========================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor
)
# ==========================================
# SAVE RESULT
# ==========================================
def save_insurance_status(mysql_conn, rc, prijmeni, jmeno, k_datu, result, xml):
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, prijmeni, jmeno, k_datu,
stav, kod_pojistovny, nazev_pojistovny,
pojisteni_kod, stav_vyrizeni, response_xml)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
with mysql_conn.cursor() as cur:
cur.execute(sql, (
rc,
prijmeni,
jmeno,
k_datu,
result["stav"],
result["kodPojistovny"],
result["nazevPojistovny"],
result["pojisteniKod"],
result["stavVyrizeni"],
xml
))
# ==========================================
# VZP CONFIG
# ==========================================
HOST = "192.168.1.4"
DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB"
PFX_PATH = PROJECT_ROOT / "certificates" / "MBcert.pfx"
PFX_PASSWORD = "Vlado7309208104++"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
# ==========================================
# INIT
# ==========================================
db = MedicusDB(HOST, DB_PATH)
vzp = VZPB2BClient(
ENV,
str(PFX_PATH),
PFX_PASSWORD,
icz=ICZ,
dic=DIC
)
# ==========================================
# HELPER FUNCTIONS
# ==========================================
def vzp_query(rc, prijmeni, jmeno, d):
xml = vzp.stav_pojisteni(rc=rc, k_datu=d.isoformat())
result = vzp.parse_stav_pojisteni(xml)
save_insurance_status(mysql, rc, prijmeni, jmeno, d, result, xml)
time.sleep(2) # VZP rate limit
return result["stav"]
def binary_search_change(rc, prijmeni, jmeno, start_date, end_date):
"""
Najde PRVNÍ den, kdy stav != '1'
"""
low = start_date
high = end_date
found = None
while low <= high:
mid = low + (high - low) // 2
log(f" 🔍 {rc} probing {mid}")
stav = vzp_query(rc, prijmeni, jmeno, mid)
if stav == '1':
low = mid + timedelta(days=1)
else:
found = mid
high = mid - timedelta(days=1)
return found
# ==========================================
# LOAD CANDIDATES
# ==========================================
with mysql.cursor() as cur:
cur.execute("""
SELECT
x.rc,
p.prijmeni,
p.jmeno,
x.last_ok
FROM (
SELECT
rc,
MAX(CASE WHEN stav = '1' THEN k_datu END) AS last_ok,
MAX(k_datu) AS last_seen
FROM vzp_stav_pojisteni
GROUP BY rc
) x
JOIN vzp_stav_pojisteni p
ON p.rc = x.rc
AND p.k_datu = x.last_seen
WHERE
x.last_ok IS NOT NULL
AND x.last_seen = (
SELECT MAX(k_datu)
FROM vzp_stav_pojisteni z
WHERE z.rc = x.rc AND z.stav <> '1'
);
""")
patients = cur.fetchall()
log(f"Loaded {len(patients)} patients for binary search\n")
# ==========================================
# MAIN LOOP
# ==========================================
today = date.today()
for p in patients:
rc = p["rc"]
prijmeni = p["prijmeni"]
jmeno = p["jmeno"]
last_ok = p["last_ok"]
log(f"\n📌 {prijmeni} {jmeno} ({rc}) searching change")
change_date = binary_search_change(
rc,
prijmeni,
jmeno,
last_ok,
today
)
if change_date:
log(f" ✅ změna nastala: {change_date}")
else:
log(" ⚠️ změna nenalezena")
# ==========================================
# CLEANUP
# ==========================================
db.close()
mysql.close()
log("\nDONE binary search insurance check finished.")

106
10 Tests/2026-01-16 3.py Normal file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
REPORT změna stavu pojištění
Výstup:
pacient XY, současný (dnešní) stav pojištění je "X",
ke změně došlo dne Y, kdy stav byl "Z"
Používá pouze tabulku vzp_stav_pojisteni
Kompatibilní s ONLY_FULL_GROUP_BY
"""
import pymysql
# ==========================================
# MYSQL CONFIG
# ==========================================
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
# ==========================================
# SQL REPORT
# ==========================================
SQL = """
SELECT
cur.rc,
cur.prijmeni,
cur.jmeno,
cur.stav AS current_stav,
chg.k_datu AS change_date,
chg.stav AS change_stav
FROM
-- aktuální (poslední) stav pacienta
(
SELECT p1.*
FROM vzp_stav_pojisteni p1
JOIN (
SELECT rc, MAX(k_datu) AS max_date
FROM vzp_stav_pojisteni
GROUP BY rc
) x
ON x.rc = p1.rc
AND x.max_date = p1.k_datu
WHERE p1.stav <> '1'
) cur
JOIN
-- první den změny po posledním stavu = '1'
(
SELECT p2.rc, p2.k_datu, p2.stav
FROM vzp_stav_pojisteni p2
JOIN (
SELECT rc, MIN(k_datu) AS change_date
FROM vzp_stav_pojisteni
WHERE stav <> '1'
AND k_datu > (
SELECT MAX(k_datu)
FROM vzp_stav_pojisteni t
WHERE t.rc = vzp_stav_pojisteni.rc
AND t.stav = '1'
)
GROUP BY rc
) y
ON y.rc = p2.rc
AND y.change_date = p2.k_datu
) chg
ON chg.rc = cur.rc
ORDER BY cur.prijmeni, cur.jmeno;
"""
# ==========================================
# MAIN
# ==========================================
def main():
mysql = pymysql.connect(**MYSQL_CONFIG)
with mysql.cursor() as cur:
cur.execute(SQL)
rows = cur.fetchall()
mysql.close()
print("\nREPORT ZMĚNA STAVU POJIŠTĚNÍ\n")
for r in rows:
jmeno = f"{r['prijmeni']} {r['jmeno']}".strip()
print(
f"pacient {jmeno}, "
f"současný (dnešní) stav pojištění je \"{r['current_stav']}\", "
f"ke změně došlo dne {r['change_date']}, "
f"kdy stav byl \"{r['change_stav']}\""
)
print("\nKONEC REPORTU.")
# ==========================================
if __name__ == "__main__":
main()

108
10 Tests/2026-01-17 01.py Normal file
View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
REPORT ZMĚNA STAVU POJIŠTĚNÍ
Výstup:
pacient XY, současný stav pojištění je "X",
ke změně došlo dne YYYY-MM-DD (ze "1" na "X")
Používá pouze tabulku vzp_stav_pojisteni
"""
import pymysql
# ==========================================
# MYSQL CONFIG
# ==========================================
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
# ==========================================
# SQL KOREKTNÍ DEFINICE ZMĚNY (1 → X)
# ==========================================
SQL = """
SELECT
cur.rc,
cur.prijmeni,
cur.jmeno,
cur.stav AS current_stav,
chg.k_datu AS change_date,
chg.stav AS change_stav
FROM
-- aktuální stav pacienta
(
SELECT p1.*
FROM vzp_stav_pojisteni p1
JOIN (
SELECT rc, MAX(k_datu) AS max_date
FROM vzp_stav_pojisteni
GROUP BY rc
) x
ON x.rc = p1.rc
AND x.max_date = p1.k_datu
WHERE p1.stav <> '1'
) cur
JOIN
-- poslední den pojištění
(
SELECT rc, MAX(k_datu) AS last_ok
FROM vzp_stav_pojisteni
WHERE stav = '1'
GROUP BY rc
) ok
ON ok.rc = cur.rc
JOIN
-- JEDEN konkrétní řádek změny (nejmenší ID)
vzp_stav_pojisteni chg
ON chg.id = (
SELECT MIN(id)
FROM vzp_stav_pojisteni z
WHERE z.rc = cur.rc
AND z.k_datu = (
SELECT MIN(k_datu)
FROM vzp_stav_pojisteni zz
WHERE zz.rc = cur.rc
AND zz.k_datu > ok.last_ok
)
)
ORDER BY cur.prijmeni, cur.jmeno;
"""
# ==========================================
# MAIN
# ==========================================
def main():
mysql = pymysql.connect(**MYSQL_CONFIG)
with mysql.cursor() as cur:
cur.execute(SQL)
rows = cur.fetchall()
mysql.close()
print("\nREPORT ZMĚNA STAVU POJIŠTĚNÍ\n")
for r in rows:
pacient = f"{r['prijmeni']} {r['jmeno']}".strip()
print(
f"pacient {pacient}, "
f"současný stav pojištění je \"{r['current_stav']}\", "
f"ke změně došlo dne {r['change_date']} "
f"(ze \"1\" na \"{r['change_stav']}\")"
)
print("\nKONEC REPORTU.")
# ==========================================
if __name__ == "__main__":
main()

View File

@@ -1,9 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import logging
from vzpb2b_client import VZPB2BClient
from knihovny.vzpb2b_client import VZPB2BClient
from datetime import date
# ==========================================

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Doplní jméno a příjmení do vzp_stav_pojisteni
podle Medicusu tam, kde jsou NULL.
Bez volání VZP.
Bez změny jiných dat.
"""
import sys
from pathlib import Path
# ==========================================
# PROJECT ROOT (import fix)
# ==========================================
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
import pymysql
from knihovny.medicus_db import MedicusDB
# ==========================================
# CONFIGURATION
# ==========================================
MEDICUS_HOST = "192.168.1.4"
MEDICUS_DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB"
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"autocommit": True
}
# ==========================================
# INIT CONNECTIONS
# ==========================================
print("Connecting to Medicus...")
medicus = MedicusDB(MEDICUS_HOST, MEDICUS_DB_PATH)
print("Connecting to MySQL...")
mysql = pymysql.connect(
cursorclass=pymysql.cursors.DictCursor,
**MYSQL_CONFIG
)
# ==========================================
# FETCH REGISTERED PATIENTS
# ==========================================
print("Loading registered patients from Medicus...")
patients = medicus.get_active_registered_patients(as_dict=True)
print(f"Loaded {len(patients)} patients")
# ==========================================
# UPDATE MYSQL
# ==========================================
updated_rows = 0
with mysql.cursor() as cur:
for p in patients:
rc = p["rodcis"]
jmeno = p.get("jmeno")
prijmeni = p.get("prijmeni")
if not rc or not jmeno or not prijmeni:
continue
cur.execute("""
UPDATE vzp_stav_pojisteni
SET
jmeno = COALESCE(jmeno, %s),
prijmeni = COALESCE(prijmeni, %s)
WHERE rc = %s
AND (jmeno IS NULL OR prijmeni IS NULL)
""", (jmeno, prijmeni, rc))
if cur.rowcount > 0:
updated_rows += cur.rowcount
mysql.close()
medicus.close()
print(f"\n✅ Hotovo. Aktualizováno {updated_rows} řádků ve vzp_stav_pojisteni.")

View File

@@ -4,7 +4,7 @@
import time
import logging
from knihovny.medicus_db import MedicusDB
from vzpb2b_client import VZPB2BClient
from knihovny.vzpb2b_client import VZPB2BClient
import pymysql
from datetime import date

View File

@@ -1,4 +1,4 @@
from vzpb2b_client import VZPB2BClient
from knihovny.vzpb2b_client import VZPB2BClient
client = VZPB2BClient(
env="simu", # or "prod"

View File

@@ -1,213 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from requests_pkcs12 import Pkcs12Adapter
import requests
import uuid
from datetime import date
class VZPB2BClient:
def __init__(self, env: str, pfx_path: str, pfx_password: str,
icz: str = "00000000", dic: str = "00000000"):
# Normalize environment name
env = env.lower().strip()
if env in ("prod", "production", "live", "real"):
self.env = "prod"
elif env in ("simu", "simulace", "test", "testing"):
self.env = "simu"
else:
raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.")
self.pfx_path = pfx_path
self.pfx_password = pfx_password
self.icz = icz
self.dic = dic
# Prepare mTLS session
session = requests.Session()
session.mount(
"https://",
Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password)
)
self.session = session
# --------------------------------------------------------------
# URL BUILDER
# --------------------------------------------------------------
def _build_endpoint(self, service_name: str) -> str:
"""
SIMU:
https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU<Service>?sluzba=SIMU<Service>
PROD:
https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/<Service>
"""
if self.env == "simu":
simu_service = f"SIMU{service_name}"
return (
f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/"
f"{simu_service}?sluzba={simu_service}"
)
# Production
return (
f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}"
)
# --------------------------------------------------------------
# SOAP HEADER BUILDER
# --------------------------------------------------------------
def _header(self) -> str:
idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars
return f"""
<com:idZpravy>{idZpravy}</com:idZpravy>
<com:idSubjektu>
<com:icz>{self.icz}</com:icz>
<com:dic>{self.dic}</com:dic>
</com:idSubjektu>
"""
# --------------------------------------------------------------
# OVERPRUKAZ — EHIC CHECK
# --------------------------------------------------------------
def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str:
"""
Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env).
Returns raw XML string.
"""
service = "OverPrukazPojistenceB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/OverPrukazPojistenceB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:OverPrukazPojistenceB2BPozadavek>
<vzp:cisloPrukazu>{cislo_prukazu}</vzp:cisloPrukazu>
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:OverPrukazPojistenceB2BPozadavek>
</soap:Body>
</soap:Envelope>
"""
headers = {"Content-Type": "text/xml; charset=utf-8"}
print(f"Calling: {endpoint}")
response = self.session.post(
endpoint,
data=soap.encode("utf-8"),
headers=headers,
timeout=30
)
print("HTTP:", response.status_code)
return response.text
def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None):
"""
Calls stavPojisteniB2B (SIMU or PROD).
"""
service = "stavPojisteniB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
prijmeni_xml = f"<vzp:prijmeni>{prijmeni}</vzp:prijmeni>" if prijmeni else ""
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/stavPojisteniB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:stavPojisteniB2B>
<vzp:cisloPojistence>{rc}</vzp:cisloPojistence>
{prijmeni_xml}
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:stavPojisteniB2B>
</soap:Body>
</soap:Envelope>
"""
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process"
}
print(f"Calling: {endpoint}")
resp = self.session.post(endpoint, data=soap.encode("utf-8"),
headers=headers, timeout=30)
print("HTTP:", resp.status_code)
return resp.text
def parse_stav_pojisteni(self, xml_text: str):
"""
Parses stavPojisteniB2B SOAP response into a Python dict.
Returned structure:
{
"stavVyrizeni": int,
"stav": str | None,
"kodPojistovny": str | None,
"nazevPojistovny": str | None,
"pojisteniKod": str | None
}
"""
import xml.etree.ElementTree as ET
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B"
}
root = ET.fromstring(xml_text)
# ---- Extract status ----
stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS)
stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None
# ---- If no stavPojisteni element present (e.g. 0 or some errors) ----
node_stav = root.find(".//vzp:stavPojisteni", NS)
if node_stav is None:
return {
"stavVyrizeni": stav_vyr,
"stav": None,
"kodPojistovny": None,
"nazevPojistovny": None,
"pojisteniKod": None,
}
def get(tag):
el = node_stav.find(f"vzp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
return {
"stavVyrizeni": stav_vyr,
"stav": get("stav"),
"kodPojistovny": get("kodPojistovny"),
"nazevPojistovny": get("nazevPojistovny"),
"pojisteniKod": get("pojisteniKod"),
}