Merge branch 'master' of ssh://192.168.1.76:2222/administrator/insurance

This commit is contained in:
2025-12-15 05:58:33 +01:00
19 changed files with 1817 additions and 0 deletions

132
01 testik.py Normal file
View File

@@ -0,0 +1,132 @@
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
# ========== CONFIG ==========
PFX_PATH = r"mbcert.pfx"
PFX_PASS = "Vlado7309208104++"
VERIFY = True
EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B"
SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B"
# RC provided by you (can contain slash; we normalize before sending/saving)
RC = "7309208104"
# RC = "280616/091"
K_DATU = date.today().isoformat()
# MySQL (table already created as per previous DDL)
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
# ========== HELPERS ==========
def to_int_or_none(val):
if val is None:
return None
s = str(val).strip()
return int(s) if s.isdigit() else None
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
def post_soap(endpoint: str, body_xml: str) -> str:
env = f'''<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{SOAP_NS}">
<soap:Body>{body_xml}</soap:Body>
</soap:Envelope>'''
s = requests.Session()
s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
r = s.post(
endpoint,
data=env.encode("utf-8"),
headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"},
timeout=30,
verify=VERIFY,
)
print("HTTP:", r.status_code)
return r.text
def parse_stav(xml_text: str):
NS = {"soap": SOAP_NS, "s": NS_STAV}
root = ET.fromstring(xml_text)
out = {
"stav": None,
"kodPojistovny": None,
"nazevPojistovny": None,
"pojisteniKod": None,
"stavVyrizeniPozadavku": None,
}
node = root.find(".//s:stavPojisteni", NS)
if node is not None:
for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"):
el = node.find(f"s:{tag}", NS)
out[tag] = el.text.strip() if el is not None and el.text else None
st = root.find(".//s:stavVyrizeniPozadavku", NS)
out["stavVyrizeniPozadavku"] = st.text.strip() if st is not None and st.text else None
return out
def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> int:
"""
Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved.
Assumes the table already exists (no DDL/verification here).
"""
payload = {
"rc": normalize_rc(rc),
"k_datu": k_datu,
"stav": to_int_or_none(parsed.get("stav")), # handles 'X' -> None
"kod_pojistovny": parsed.get("kodPojistovny"),
"nazev_pojistovny": parsed.get("nazevPojistovny"),
"pojisteni_kod": parsed.get("pojisteniKod"),
"stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")),
"response_xml": response_xml,
}
print("\n=== vzp_stav_pojisteni: will insert ===")
pprint(payload)
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s)
"""
conn = pymysql.connect(**MYSQL_CFG)
try:
with conn.cursor() as cur:
cur.execute(sql, payload)
conn.commit()
finally:
conn.close()
print("Inserted 1 row into vzp_stav_pojisteni.")
return 1
# ========== MAIN ==========
if __name__ == "__main__":
rc_norm = normalize_rc(RC)
body = f"""
<ns1:stavPojisteniB2B xmlns:ns1="{NS_STAV}">
<ns1:cisloPojistence>{rc_norm}</ns1:cisloPojistence>
<ns1:kDatu>{K_DATU}</ns1:kDatu>
</ns1:stavPojisteniB2B>""".strip()
xml = post_soap(EP_STAV, body)
parsed = parse_stav(xml)
print(parsed) # keep your quick print
# Save to MySQL and print exactly what is saved
save_stav_pojisteni(RC, K_DATU, parsed, xml)

271
02 testík.py Normal file
View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
parse the response, upsert rows into medevio.vzp_registrace,
and print what is being saved.
"""
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
from functions import get_medicus_connection
from functions import get_mysql_connection
import time, random,socket
# ------------------- CONFIG -------------------
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
PFX_PASS = "Vlado7309208104++" # <-- your export password
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
# Patient + query
RC = "7309208104" # rodné číslo without slash
# RC = "280616/091" # rodné číslo without slash
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"] # VPL (adult GP)
# MySQL
if socket.gethostname().strip() == "NTBVBHP470G10":
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
# Namespaces (from your response/WSDL)
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
}
# ------------------- HELPERS -------------------
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
inner = f"""
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
<ns1:kDatu>{k_datu}</ns1:kDatu>
<ns1:seznamOdbornosti>
{odb_xml}
</ns1:seznamOdbornosti>
</ns1:registracePojistencePZSB2B>""".strip()
return f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{NS['soap']}">
<soap:Body>
{inner}
</soap:Body>
</soap:Envelope>"""
def parse_registrace(xml_text: str):
"""
Return (rows, stav_vyrizeni) where rows is a list of dicts
for each <odbornost> item in the response.
"""
root = ET.fromstring(xml_text)
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
out = []
for it in items:
def g(tag, ctx=it):
el = ctx.find(f"rp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
poj = it.find("rp:zdravotniPojistovna", NS)
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
spec = it.find("rp:odbornost", NS)
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
out.append(dict(
icz=g("ICZ"),
icp=g("ICP"),
nazev_icp=g("nazevICP"),
nazev_szz=g("nazevSZZ"),
poj_kod=poj_kod,
poj_zkratka=poj_zkr,
odbornost_kod=odb_kod,
odbornost_nazev=odb_naz,
datum_registrace=g("datumRegistrace"),
datum_zahajeni=g("datumZahajeni"),
datum_ukonceni=g("datumUkonceni"),
))
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
return out, stav_vyrizeni
def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int:
"""
Insert or update rows into medevio.vzp_registrace and print each payload.
Assumes the table already exists (as per your DDL).
"""
if not rows:
print("No <odbornost> rows found; nothing to save.")
return 0
sql = """
INSERT INTO vzp_registrace
(rc, query_date, odbornost_kod, odbornost_nazev,
icz, icp, nazev_icp, nazev_szz,
poj_kod, poj_zkratka,
datum_registrace, datum_zahajeni, datum_ukonceni,
stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
%(poj_kod)s, %(poj_zkratka)s,
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
%(stav_vyrizeni)s, %(response_xml)s)
ON DUPLICATE KEY UPDATE
odbornost_nazev = VALUES(odbornost_nazev),
nazev_icp = VALUES(nazev_icp),
nazev_szz = VALUES(nazev_szz),
poj_kod = VALUES(poj_kod),
poj_zkratka = VALUES(poj_zkratka),
datum_registrace= VALUES(datum_registrace),
datum_zahajeni = VALUES(datum_zahajeni),
datum_ukonceni = VALUES(datum_ukonceni),
stav_vyrizeni = VALUES(stav_vyrizeni),
response_xml = VALUES(response_xml),
updated_at = CURRENT_TIMESTAMP
"""
rc_norm = normalize_rc(rc)
qd = query_date or date.today().isoformat()
payloads = []
for r in rows:
payload = {
"rc": rc_norm,
"query_date": qd,
**r,
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
}
payloads.append(payload)
# Print what we're going to save
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
for p in payloads:
pprint(p)
conn = pymysql.connect(**MYSQL_CFG)
try:
with conn.cursor() as cur:
cur.executemany(sql, payloads)
conn.commit()
finally:
conn.close()
print(f"\nUpserted rows: {len(payloads)}")
return len(payloads)
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vreg.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY query_date DESC
) AS rn
FROM vzp_registrace AS vreg
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
# ------------------- MAIN FLOW -------------------
def main():
con = get_medicus_connection()
cur = con.cursor()
cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '9'")
# cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'")
# Vytvor seznam rodnych cisel, která už máme
rc_set_vzp = prepare_processed_rcs()
rows = cur.fetchall()
print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}")
for row in rows:
if row[0] in rc_set_vzp:
continue
else:
print(row[0], row[1], row[2])
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"]
RC=row[0]
# Build SOAP envelope
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
# mTLS session
session = requests.Session()
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this
}
# Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML
# print(resp.text)
# Parse and save
rows, stav = parse_registrace(resp.text)
upsert_rows(RC, K_DATU, rows, stav, resp.text)
time.sleep(random.uniform(1, 5))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,262 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
parse the response, upsert rows into medevio.vzp_registrace,
and print what is being saved.
"""
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
from functions import get_medicus_connection
from functions import get_mysql_connection
import time, random,socket
# ------------------- CONFIG -------------------
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
PFX_PASS = "Vlado7309208104++" # <-- your export password
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
# Patient + query
RC = "7309208104" # rodné číslo without slash
# RC = "280616/091" # rodné číslo without slash
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"] # VPL (adult GP)
# MySQL
if socket.gethostname().strip() == "NTBVBHP470G10":
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
# Namespaces (from your response/WSDL)
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
}
# ------------------- HELPERS -------------------
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
# Ukázka XML Požadavek:
# <registracePojistencePZSB2B xmlns="http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1">
# <cisloPojistence>801212855</cisloPojistence>
# <seznamOdbornosti>
# <kodOdbornosti>002</kodOdbornosti>
# </seznamOdbornosti>
# </registracePojistencePZSB2B>
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
inner = f"""
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
<ns1:kDatu>{k_datu}</ns1:kDatu>
<ns1:seznamOdbornosti>
{odb_xml}
</ns1:seznamOdbornosti>
</ns1:registracePojistencePZSB2B>""".strip()
return f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{NS['soap']}">
<soap:Body>
{inner}
</soap:Body>
</soap:Envelope>"""
def parse_registrace(xml_text: str):
"""
Return (rows, stav_vyrizeni) where rows is a list of dicts
for each <odbornost> item in the response.
"""
root = ET.fromstring(xml_text)
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
out = []
for it in items:
def g(tag, ctx=it):
el = ctx.find(f"rp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
poj = it.find("rp:zdravotniPojistovna", NS)
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
spec = it.find("rp:odbornost", NS)
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
out.append(dict(
icz=g("ICZ"),
icp=g("ICP"),
nazev_icp=g("nazevICP"),
nazev_szz=g("nazevSZZ"),
poj_kod=poj_kod,
poj_zkratka=poj_zkr,
odbornost_kod=odb_kod,
odbornost_nazev=odb_naz,
datum_registrace=g("datumRegistrace"),
datum_zahajeni=g("datumZahajeni"),
datum_ukonceni=g("datumUkonceni"),
))
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
return out, stav_vyrizeni
def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int:
"""
Insert or update rows into medevio.vzp_registrace and print each payload.
Assumes the table already exists (as per your DDL).
"""
if not rows:
print("No <odbornost> rows found; nothing to save.")
return 0
sql = """
INSERT INTO vzp_registrace
(rc, query_date, odbornost_kod, odbornost_nazev,
icz, icp, nazev_icp, nazev_szz,
poj_kod, poj_zkratka,
datum_registrace, datum_zahajeni, datum_ukonceni,
stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
%(poj_kod)s, %(poj_zkratka)s,
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
%(stav_vyrizeni)s, %(response_xml)s)
ON DUPLICATE KEY UPDATE
odbornost_nazev = VALUES(odbornost_nazev),
nazev_icp = VALUES(nazev_icp),
nazev_szz = VALUES(nazev_szz),
poj_kod = VALUES(poj_kod),
poj_zkratka = VALUES(poj_zkratka),
datum_registrace= VALUES(datum_registrace),
datum_zahajeni = VALUES(datum_zahajeni),
datum_ukonceni = VALUES(datum_ukonceni),
stav_vyrizeni = VALUES(stav_vyrizeni),
response_xml = VALUES(response_xml),
updated_at = CURRENT_TIMESTAMP
"""
rc_norm = normalize_rc(rc)
qd = query_date or date.today().isoformat()
payloads = []
for r in rows:
payload = {
"rc": rc_norm,
"query_date": qd,
**r,
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
}
payloads.append(payload)
# Print what we're going to save
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
for p in payloads:
pprint(p)
conn = pymysql.connect(**MYSQL_CFG)
try:
with conn.cursor() as cur:
cur.executemany(sql, payloads)
conn.commit()
finally:
conn.close()
print(f"\nUpserted rows: {len(payloads)}")
return len(payloads)
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vreg.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY query_date DESC
) AS rn
FROM vzp_registrace AS vreg
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
# ------------------- MAIN FLOW -------------------
def main():
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"]
RC='7309208104'
# Build SOAP envelope
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
# mTLS session
session = requests.Session()
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this
}
# Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML
print(resp.text)
# Parse and save
rows, stav = parse_registrace(resp.text)
print(rows,stav)
time.sleep(random.uniform(1, 5))
if __name__ == "__main__":
main()

308
02.2 Testík.py Normal file
View File

@@ -0,0 +1,308 @@
#kód bude vkládat i řádky pro pacienty bez registrovaného lékař v oboru 001#!/usr/bin/env python3 -*- coding: utf-8 -*- """ Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), parse the response, upsert rows into medevio.vzp_registrace, and print what is being saved. """ # pip install requests_pkcs12 pymysql from requests_pkcs12 import Pkcs12Adapter import requests import xml.etree.ElementTree as ET from datetime import date import pymysql from pymysql.cursors import DictCursor from pprint import pprint from functions import get_medicus_connection from functions import get_mysql_connection import time, random,socket # ------------------- CONFIG ------------------- ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive PFX_PATH = r"mbcert.pfx" # <-- your .pfx path PFX_PASS = "Vlado7309208104++" # <-- your export password VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" Patient + query RC = "7309208104" # rodné číslo without slash RC = "280616/091" # rodné
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
parse the response, upsert rows into medevio.vzp_registrace,
and print what is being saved.
"""
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
from functions import get_medicus_connection
from functions import get_mysql_connection
import time, random,socket
# ------------------- CONFIG -------------------
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
PFX_PASS = "Vlado7309208104++" # <-- your export password
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
# Patient + query
RC = "7309208104" # rodné číslo without slash
# RC = "280616/091" # rodné číslo without slash
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"] # VPL (adult GP)
# MySQL
if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
# Namespaces (from your response/WSDL)
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
}
# ------------------- HELPERS -------------------
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
inner = f"""
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
<ns1:kDatu>{k_datu}</ns1:kDatu>
<ns1:seznamOdbornosti>
{odb_xml}
</ns1:seznamOdbornosti>
</ns1:registracePojistencePZSB2B>""".strip()
return f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{NS['soap']}">
<soap:Body>
{inner}
</soap:Body>
</soap:Envelope>"""
def parse_registrace(xml_text: str):
"""
Return (rows, stav_vyrizeni) where rows is a list of dicts
for each <odbornost> item in the response.
"""
root = ET.fromstring(xml_text)
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
out = []
for it in items:
def g(tag, ctx=it):
el = ctx.find(f"rp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
poj = it.find("rp:zdravotniPojistovna", NS)
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
spec = it.find("rp:odbornost", NS)
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
out.append(dict(
icz=g("ICZ"),
icp=g("ICP"),
nazev_icp=g("nazevICP"),
nazev_szz=g("nazevSZZ"),
poj_kod=poj_kod,
poj_zkratka=poj_zkr,
odbornost_kod=odb_kod,
odbornost_nazev=odb_naz,
datum_registrace=g("datumRegistrace"),
datum_zahajeni=g("datumZahajeni"),
datum_ukonceni=g("datumUkonceni"),
))
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
return out, stav_vyrizeni
def upsert_rows(
rc: str,
query_date: str,
rows: list[dict],
stav_vyrizeni: str,
xml_text: str,
requested_odb_list: list[str] | None = None,
) -> int:
"""
Insert/update medevio.vzp_registrace.
If no <odbornost> items are returned, insert placeholder row(s)
for each requested specialty (e.g., "001") so the negative result is visible.
"""
sql = """
INSERT INTO vzp_registrace
(rc, query_date, odbornost_kod, odbornost_nazev,
icz, icp, nazev_icp, nazev_szz,
poj_kod, poj_zkratka,
datum_registrace, datum_zahajeni, datum_ukonceni,
stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
%(poj_kod)s, %(poj_zkratka)s,
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
%(stav_vyrizeni)s, %(response_xml)s)
ON DUPLICATE KEY UPDATE
odbornost_nazev = VALUES(odbornost_nazev),
nazev_icp = VALUES(nazev_icp),
nazev_szz = VALUES(nazev_szz),
poj_kod = VALUES(poj_kod),
poj_zkratka = VALUES(poj_zkratka),
datum_registrace= VALUES(datum_registrace),
datum_zahajeni = VALUES(datum_zahajeni),
datum_ukonceni = VALUES(datum_ukonceni),
stav_vyrizeni = VALUES(stav_vyrizeni),
response_xml = VALUES(response_xml),
updated_at = CURRENT_TIMESTAMP
"""
rc_norm = normalize_rc(rc)
qd = query_date or date.today().isoformat()
payloads: list[dict] = []
if rows:
# Positive path: save what came from the API
for r in rows:
payloads.append({
"rc": rc_norm,
"query_date": qd,
**r,
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
})
else:
# Negative path: no registration items -> create placeholders
if not requested_odb_list:
requested_odb_list = ["001"]
for kod in requested_odb_list:
payloads.append({
"rc": rc_norm,
"query_date": qd,
"odbornost_kod": kod,
"odbornost_nazev": None,
"icz": None,
"icp": None,
"nazev_icp": None,
"nazev_szz": None,
"poj_kod": None,
"poj_zkratka": None,
"datum_registrace": None,
"datum_zahajeni": None,
"datum_ukonceni": None,
# Keep what VZP said (e.g., 'X'), and raw XML for audit
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
})
# Print what we're going to save
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
for p in payloads:
pprint(p)
if not payloads:
print("No payloads prepared (this should not happen).")
return 0
connsql = get_mysql_connection()
try:
with connsql.cursor() as cur:
cur.executemany(sql, payloads)
connsql.commit()
finally:
connsql.close()
print(f"\nUpserted rows: {len(payloads)}")
return len(payloads)
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vreg.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY query_date DESC
) AS rn
FROM vzp_registrace AS vreg
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
# ------------------- MAIN FLOW -------------------
def main():
con = get_medicus_connection()
cur = con.cursor()
cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '1'")
# cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'")
# Vytvor seznam rodnych cisel, která už máme
rc_set_vzp = prepare_processed_rcs()
rows = cur.fetchall()
print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}")
for row in rows:
if row[0] in rc_set_vzp:
continue
else:
print(row[0], row[1], row[2])
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"]
RC=row[0]
# Build SOAP envelope
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
# mTLS session
session = requests.Session()
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this
}
print(envelope)
# Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML
print(resp.text)
# Parse and save
rows, stav = parse_registrace(resp.text)
print(rows,stav)
upsert_rows(RC, K_DATU, rows, stav, resp.text, requested_odb_list=ODBORNOSTI)
time.sleep(random.uniform(1, 5))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,46 @@
from functions import get_medicus_connection
from functions import get_mysql_connection
from functions import check_insurance
import time, random
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vr.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY k_datu DESC, queried_at DESC
) AS rn
FROM vzp_stav_pojisteni AS vr
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
con=get_medicus_connection()
cur=con.cursor()
cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '3'")
rc_set_vzp=prepare_processed_rcs()
rows=cur.fetchall()
print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}")
for row in rows:
if row[0] in rc_set_vzp:
continue
else:
print(row[0], row[1],row[2])
print(check_insurance(row[0]))
time.sleep(random.uniform(1, 5))

7
04 testik.py Normal file
View File

@@ -0,0 +1,7 @@
from functions import check_insurance
if __name__ == "__main__":
rc = "7309208104"
res = check_insurance(rc)
print("=== RESULT ===")
print(res)

4
05 testik.py Normal file
View File

@@ -0,0 +1,4 @@
import socket
computer_name = socket.gethostname()
print(computer_name)

BIN
10 Tests/MBcert.pfx Normal file

Binary file not shown.

View File

@@ -0,0 +1,67 @@
import pandas as pd
import fdb
import pymysql
# ---------------------------------
# FIREBIRD CONNECTION
# ---------------------------------
fb = fdb.connect(
host="192.168.1.4",
database=r"z:\Medicus 3\data\MEDICUS.FDB",
user="SYSDBA",
password="masterkey",
charset="WIN1250"
)
cur = fb.cursor()
sql_fb = """
SELECT kar.rodcis
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
"""
cur.execute(sql_fb)
rows_fb = cur.fetchall()
df_fb = pd.DataFrame(rows_fb, columns=["rc"])
print("FB count:", len(df_fb))
# ---------------------------------
# MYSQL CONNECTION
# ---------------------------------
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4"
)
sql_mysql = """
SELECT rc
FROM vzp_stav_pojisteni AS v
WHERE v.k_datu = CURDATE()
AND v.id = (
SELECT MAX(id)
FROM vzp_stav_pojisteni
WHERE rc = v.rc
AND k_datu = CURDATE()
);
"""
df_mysql = pd.read_sql(sql_mysql, mysql)
print("MySQL count:", len(df_mysql))
# ---------------------------------
# FIND MISSING RC
# ---------------------------------
df_missing = df_fb[~df_fb["rc"].isin(df_mysql["rc"])]
print("\nMissing patients:")
print(df_missing)
fb.close()
mysql.close()

42
10 Tests/medicus_db.py Normal file
View File

@@ -0,0 +1,42 @@
import fdb
class MedicusDB:
def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"):
self.conn = fdb.connect(
host=host,
database=db_path,
user=user,
password=password,
charset=charset
)
self.cur = self.conn.cursor()
def query(self, sql, params=None):
self.cur.execute(sql, params or ())
return self.cur.fetchall()
def query_dict(self, sql, params=None):
self.cur.execute(sql, params or ())
cols = [d[0].strip().lower() for d in self.cur.description]
return [dict(zip(cols, row)) for row in self.cur.fetchall()]
def get_active_registered_patients(self):
sql = """
SELECT
kar.rodcis,
kar.prijmeni,
kar.jmeno,
kar.poj
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
AND kar.rodcis IS NOT NULL
AND kar.rodcis <> ''
"""
return self.query(sql) # or self.query_dict(sql)
def close(self):
self.conn.close()

79
10 Tests/rozdíl.py Normal file
View File

@@ -0,0 +1,79 @@
import pandas as pd
import pymysql
from medicus_db import MedicusDB
import fdb
# FULL OUTPUT SETTINGS
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 0)
pd.set_option("display.max_colwidth", None)
# ===========
#
# ===========================
# FIREBIRD → načtení registrovaných pacientů
# ======================================
db = MedicusDB("192.168.1.4", r"z:\Medicus 3\data\MEDICUS.FDB")
rows_fb = db.get_active_registered_patients() # vrací rc, prijmeni, jmeno, poj
db.close()
df_fb = pd.DataFrame(rows_fb, columns=["rc", "prijmeni", "jmeno", "poj_medicus"])
df_fb["poj_medicus"] = df_fb["poj_medicus"].astype(str).str.strip()
print("FB count:", len(df_fb))
# ======================================
# MYSQL → načtení dnešních výsledků
# ======================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4"
)
sql_mysql = """
SELECT rc,
kod_pojistovny AS poj_mysql,
nazev_pojistovny,
stav,
stav_vyrizeni
FROM vzp_stav_pojisteni AS v
WHERE v.k_datu = CURDATE()
AND v.id = (
SELECT MAX(id)
FROM vzp_stav_pojisteni
WHERE rc = v.rc
AND k_datu = CURDATE()
);
"""
df_mysql = pd.read_sql(sql_mysql, mysql)
df_mysql["poj_mysql"] = df_mysql["poj_mysql"].astype(str).str.strip()
print("MySQL count:", len(df_mysql))
# ======================================
# LEFT JOIN: Medicus ↔ MySQL podle RC
# ======================================
df_merge = df_fb.merge(df_mysql, on="rc", how="left")
# ======================================
# Najít rozdíly pojišťovny
# ======================================
df_diff = df_merge[df_merge["poj_medicus"] != df_merge["poj_mysql"]]
print("\nPacienti s rozdílnou pojišťovnou:")
print(df_diff[["rc", "prijmeni", "jmeno", "poj_medicus", "poj_mysql", "nazev_pojistovny"]])
# Pokud chceš uložit do Excelu:
# df_diff.to_excel("rozdil_pojistoven.xlsx", index=False)

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import logging
from medicus_db import MedicusDB
from vzpb2b_client import VZPB2BClient
import pymysql
from datetime import date
# ==========================================
# LOGGING SETUP
# ==========================================
logging.basicConfig(
filename="insurance_check.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8"
)
console = logging.getLogger("console")
console.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
console.addHandler(handler)
def log_info(msg):
logging.info(msg)
console.info(msg)
def log_error(msg):
logging.error(msg)
console.error(msg)
# ==========================================
# MYSQL CONNECTION
# ==========================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
autocommit=True
)
def save_insurance_status(mysql_conn, rc, k_datu, result, xml_text):
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, k_datu, stav, kod_pojistovny, nazev_pojistovny,
pojisteni_kod, stav_vyrizeni, response_xml)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
try:
with mysql_conn.cursor() as cur:
cur.execute(sql, (
rc,
k_datu,
result["stav"],
result["kodPojistovny"],
result["nazevPojistovny"],
result["pojisteniKod"],
result["stavVyrizeni"],
xml_text
))
except Exception as e:
log_error(f"❌ MYSQL ERROR for RC {rc}: {e}")
log_error(f"---- RAW XML ----\n{xml_text}\n-----------------")
raise
# ==========================================
# CONFIGURATION
# ==========================================
HOST = "192.168.1.4"
DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB"
PFX_PATH = r"MBcert.pfx"
PFX_PASSWORD = "Vlado7309208104++"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
# ==========================================
# INIT CONNECTIONS
# ==========================================
db = MedicusDB(HOST, DB_PATH)
vzp = VZPB2BClient(ENV, PFX_PATH, PFX_PASSWORD, icz=ICZ, dic=DIC)
# ==========================================
# FETCH REGISTERED PATIENTS
# ==========================================
patients = db.get_active_registered_patients()
log_info(f"Checking {len(patients)} registered patients...\n")
k_datu = date.today().isoformat()
# ==========================================
# LOOP ONE PATIENT PER SECOND
# ==========================================
for rodcis, prijmeni, jmeno, poj in patients:
log_info(f"=== Checking {prijmeni} {jmeno} ({rodcis}) ===")
xml = vzp.stav_pojisteni(rc=rodcis, k_datu=k_datu)
# 1) Check if response looks like XML
if not xml.strip().startswith("<"):
log_error(f"❌ INVALID XML for RC {rodcis}")
log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------")
time.sleep(2)
continue
# 2) Try parsing XML
try:
result = vzp.parse_stav_pojisteni(xml)
except Exception as e:
log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}")
log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------")
time.sleep(2)
continue
log_info(f"Result: {result}")
# 3) Save into MySQL (with logging)
try:
save_insurance_status(mysql, rodcis, k_datu, result, xml)
except Exception:
log_error(f"❌ FAILURE inserting to MySQL for {rodcis}")
continue
time.sleep(2)
db.close()
log_info("\nDONE.")

View File

@@ -0,0 +1,19 @@
cur.execute("select rodcis,prijmeni,jmeno from kar where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc")
from vzpb2b_client import VZPB2BClient
client = VZPB2BClient(
env="production",
pfx_path="mbcert.pfx",
pfx_password="Vlado7309208104++",
icz="00000000",
dic="00000000"
)
response = client.stav_pojisteni("0308020152")
# print(response)
print(client.parse_stav_pojisteni(response))

12
10 Tests/test.py Normal file
View File

@@ -0,0 +1,12 @@
from vzpb2b_client import VZPB2BClient
client = VZPB2BClient(
env="simu", # or "prod"
pfx_path="mbcert.pfx",
pfx_password="Vlado7309208104++",
icz="00000000",
dic="00000000"
)
result_xml = client.over_prukaz_pojistence("80203111194350000001")
print(result_xml)

213
10 Tests/vzpb2b_client.py Normal file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from requests_pkcs12 import Pkcs12Adapter
import requests
import uuid
from datetime import date
class VZPB2BClient:
def __init__(self, env: str, pfx_path: str, pfx_password: str,
icz: str = "00000000", dic: str = "00000000"):
# Normalize environment name
env = env.lower().strip()
if env in ("prod", "production", "live", "real"):
self.env = "prod"
elif env in ("simu", "simulace", "test", "testing"):
self.env = "simu"
else:
raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.")
self.pfx_path = pfx_path
self.pfx_password = pfx_password
self.icz = icz
self.dic = dic
# Prepare mTLS session
session = requests.Session()
session.mount(
"https://",
Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password)
)
self.session = session
# --------------------------------------------------------------
# URL BUILDER
# --------------------------------------------------------------
def _build_endpoint(self, service_name: str) -> str:
"""
SIMU:
https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU<Service>?sluzba=SIMU<Service>
PROD:
https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/<Service>
"""
if self.env == "simu":
simu_service = f"SIMU{service_name}"
return (
f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/"
f"{simu_service}?sluzba={simu_service}"
)
# Production
return (
f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}"
)
# --------------------------------------------------------------
# SOAP HEADER BUILDER
# --------------------------------------------------------------
def _header(self) -> str:
idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars
return f"""
<com:idZpravy>{idZpravy}</com:idZpravy>
<com:idSubjektu>
<com:icz>{self.icz}</com:icz>
<com:dic>{self.dic}</com:dic>
</com:idSubjektu>
"""
# --------------------------------------------------------------
# OVERPRUKAZ — EHIC CHECK
# --------------------------------------------------------------
def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str:
"""
Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env).
Returns raw XML string.
"""
service = "OverPrukazPojistenceB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/OverPrukazPojistenceB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:OverPrukazPojistenceB2BPozadavek>
<vzp:cisloPrukazu>{cislo_prukazu}</vzp:cisloPrukazu>
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:OverPrukazPojistenceB2BPozadavek>
</soap:Body>
</soap:Envelope>
"""
headers = {"Content-Type": "text/xml; charset=utf-8"}
print(f"Calling: {endpoint}")
response = self.session.post(
endpoint,
data=soap.encode("utf-8"),
headers=headers,
timeout=30
)
print("HTTP:", response.status_code)
return response.text
def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None):
"""
Calls stavPojisteniB2B (SIMU or PROD).
"""
service = "stavPojisteniB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
prijmeni_xml = f"<vzp:prijmeni>{prijmeni}</vzp:prijmeni>" if prijmeni else ""
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/stavPojisteniB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:stavPojisteniB2B>
<vzp:cisloPojistence>{rc}</vzp:cisloPojistence>
{prijmeni_xml}
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:stavPojisteniB2B>
</soap:Body>
</soap:Envelope>
"""
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process"
}
print(f"Calling: {endpoint}")
resp = self.session.post(endpoint, data=soap.encode("utf-8"),
headers=headers, timeout=30)
print("HTTP:", resp.status_code)
return resp.text
def parse_stav_pojisteni(self, xml_text: str):
"""
Parses stavPojisteniB2B SOAP response into a Python dict.
Returned structure:
{
"stavVyrizeni": int,
"stav": str | None,
"kodPojistovny": str | None,
"nazevPojistovny": str | None,
"pojisteniKod": str | None
}
"""
import xml.etree.ElementTree as ET
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B"
}
root = ET.fromstring(xml_text)
# ---- Extract status ----
stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS)
stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None
# ---- If no stavPojisteni element present (e.g. 0 or some errors) ----
node_stav = root.find(".//vzp:stavPojisteni", NS)
if node_stav is None:
return {
"stavVyrizeni": stav_vyr,
"stav": None,
"kodPojistovny": None,
"nazevPojistovny": None,
"pojisteniKod": None,
}
def get(tag):
el = node_stav.find(f"vzp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
return {
"stavVyrizeni": stav_vyr,
"stav": get("stav"),
"kodPojistovny": get("kodPojistovny"),
"nazevPojistovny": get("nazevPojistovny"),
"pojisteniKod": get("pojisteniKod"),
}

24
File3.py Normal file
View File

@@ -0,0 +1,24 @@
from functions import check_insurance
import time
import fdb
# MEDICUS local CFG (table already created as per previous DDL)
MEDICUS_CFG = dict(
dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb",
user="SYSDBA",
password="masterkey",
charset="win1250"
)
conn=fdb.connect(**MEDICUS_CFG)
cur=conn.cursor()
cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '8'")
rows=cur.fetchall()
print(len(rows))
for row in rows:
print(row[0], row[1],row[2])
print(check_insurance(row[0]))
time.sleep(.25)

BIN
MBcert.pfx Normal file

Binary file not shown.

194
functions.py Normal file
View File

@@ -0,0 +1,194 @@
# functions.py
# pip install requests_pkcs12 pymysql
from __future__ import annotations
import os
from datetime import date
from pprint import pprint
import xml.etree.ElementTree as ET
import requests
from requests_pkcs12 import Pkcs12Adapter
import pymysql
from pymysql.cursors import DictCursor
import fdb
import socket
def get_medicus_connection():
"""
Attempt to create a Firebird connection to the Medicus database.
Returns:
fdb.Connection object on success
None on failure
"""
if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MEDICUS_CFG = dict(
dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb",
user="SYSDBA",
password="masterkey",
charset="win1250",
)
elif socket.gethostname().strip()=="SESTRA":
MEDICUS_CFG = dict(
dsn=r"192.168.1.10:m:\medicus\data\medicus.fdb",
user="SYSDBA",
password="masterkey",
charset="win1250",
)
try:
return fdb.connect(**MEDICUS_CFG)
except fdb.fbcore.DatabaseError as e:
print(f"Medicus DB connection failed: {e}")
return None
def get_mysql_connection():
"""
Return a PyMySQL connection or None if the connection fails.
"""
if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
try:
return pymysql.connect(**MYSQL_CFG)
except pymysql.MySQLError as e:
print(f"MySQL connection failed: {e}")
return None
# ======== CONFIG (env overrides allowed) ========
PFX_PATH = os.getenv("VZP_PFX_PATH", r"mbcert.pfx")
PFX_PASS = os.getenv("VZP_PFX_PASS", "Vlado7309208104++")
VERIFY = os.getenv("VZP_CA_VERIFY", "true").lower() != "false" # set to path or "false" to disable
EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B"
SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B"
# ======== HELPERS ========
def normalize_rc(rc: str) -> str:
"""Return rodné číslo without slash/spaces."""
return rc.replace("/", "").replace(" ", "").strip()
def to_int_or_none(val):
if val is None:
return None
s = str(val).strip()
return int(s) if s.isdigit() else None
def _session():
s = requests.Session()
s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
return s
def post_soap(endpoint: str, inner_xml: str) -> str:
"""Wrap body in SOAP 1.1 envelope and POST with mTLS."""
envelope = f'''<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{SOAP_NS}">
<soap:Body>{inner_xml}</soap:Body>
</soap:Envelope>'''
r = _session().post(
endpoint,
data=envelope.encode("utf-8"),
headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"},
timeout=30,
verify=VERIFY,
)
# You may log r.status_code if needed.
return r.text
def parse_stav(xml_text: str) -> dict:
"""Parse StavPojisteniB2B response → dict with keys:
'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'."""
NS = {"soap": SOAP_NS, "s": NS_STAV}
root = ET.fromstring(xml_text)
out = {"stav": None, "kodPojistovny": None, "nazevPojistovny": None,
"pojisteniKod": None, "stavVyrizeniPozadavku": None}
node = root.find(".//s:stavPojisteni", NS)
if node is not None:
for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"):
el = node.find(f"s:{tag}", NS)
out[tag] = el.text.strip() if (el is not None and el.text) else None
st = root.find(".//s:stavVyrizeniPozadavku", NS)
out["stavVyrizeniPozadavku"] = st.text.strip() if (st is not None and st.text) else None
return out
def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> None:
"""Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved."""
payload = {
"rc": normalize_rc(rc),
"k_datu": k_datu,
"stav": to_int_or_none(parsed.get("stav")), # 'X' -> None
"kod_pojistovny": parsed.get("kodPojistovny"),
"nazev_pojistovny": parsed.get("nazevPojistovny"),
"pojisteni_kod": parsed.get("pojisteniKod"),
"stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")),
"response_xml": response_xml,
}
print("\n=== vzp_stav_pojisteni: will insert ===")
pprint(payload)
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s)
"""
conn = get_mysql_connection()
try:
with conn.cursor() as cur:
cur.execute(sql, payload)
conn.commit()
finally:
conn.close()
print("Inserted 1 row into vzp_stav_pojisteni.")
# ======== PUBLIC API ========
def check_insurance(rc: str, mode: str | None = None, k_datu: str | None = None):
"""
Call VZP 'StavPojisteniB2B' for given RC and date.
Returns (is_active, info_dict).
- is_active: True iff info_dict['stav'] == '1'
- info_dict: {'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'}
If mode == 'nodb' (case-insensitive), DOES NOT write to DB. Otherwise logs into medevio.vzp_stav_pojisteni.
"""
rc_norm = normalize_rc(rc)
kd = k_datu or date.today().isoformat()
body = f"""
<ns1:stavPojisteniB2B xmlns:ns1="{NS_STAV}">
<ns1:cisloPojistence>{rc_norm}</ns1:cisloPojistence>
<ns1:kDatu>{kd}</ns1:kDatu>
</ns1:stavPojisteniB2B>""".strip()
xml = post_soap(EP_STAV, body)
info = parse_stav(xml)
is_active = (info.get("stav") == "1")
if not (mode and mode.lower() == "nodb"):
save_stav_pojisteni(rc_norm, kd, info, xml)
return is_active, info

BIN
requirements.txt Normal file

Binary file not shown.