diff --git a/01 testik.py b/01 testik.py new file mode 100644 index 0000000..8c333e1 --- /dev/null +++ b/01 testik.py @@ -0,0 +1,132 @@ +# pip install requests_pkcs12 pymysql +from requests_pkcs12 import Pkcs12Adapter +import requests +import xml.etree.ElementTree as ET +from datetime import date +import pymysql +from pymysql.cursors import DictCursor +from pprint import pprint + +# ========== CONFIG ========== +PFX_PATH = r"mbcert.pfx" +PFX_PASS = "Vlado7309208104++" +VERIFY = True + +EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B" +SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/" +NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B" + +# RC provided by you (can contain slash; we normalize before sending/saving) +RC = "7309208104" +# RC = "280616/091" +K_DATU = date.today().isoformat() + +# MySQL (table already created as per previous DDL) +MYSQL_CFG = dict( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=False, +) + +# ========== HELPERS ========== +def to_int_or_none(val): + if val is None: + return None + s = str(val).strip() + return int(s) if s.isdigit() else None + +def normalize_rc(rc: str) -> str: + return rc.replace("/", "").strip() + +def post_soap(endpoint: str, body_xml: str) -> str: + env = f''' + + {body_xml} +''' + s = requests.Session() + s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) + r = s.post( + endpoint, + data=env.encode("utf-8"), + headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"}, + timeout=30, + verify=VERIFY, + ) + print("HTTP:", r.status_code) + return r.text + +def parse_stav(xml_text: str): + NS = {"soap": SOAP_NS, "s": NS_STAV} + root = ET.fromstring(xml_text) + out = { + "stav": None, + "kodPojistovny": None, + "nazevPojistovny": None, + "pojisteniKod": None, + "stavVyrizeniPozadavku": None, + } + node = root.find(".//s:stavPojisteni", NS) + if node is not None: + for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"): + el = node.find(f"s:{tag}", NS) + out[tag] = el.text.strip() if el is not None and el.text else None + st = root.find(".//s:stavVyrizeniPozadavku", NS) + out["stavVyrizeniPozadavku"] = st.text.strip() if st is not None and st.text else None + return out + +def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> int: + """ + Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved. + Assumes the table already exists (no DDL/verification here). + """ + payload = { + "rc": normalize_rc(rc), + "k_datu": k_datu, + "stav": to_int_or_none(parsed.get("stav")), # handles 'X' -> None + "kod_pojistovny": parsed.get("kodPojistovny"), + "nazev_pojistovny": parsed.get("nazevPojistovny"), + "pojisteni_kod": parsed.get("pojisteniKod"), + "stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")), + "response_xml": response_xml, + } + + print("\n=== vzp_stav_pojisteni: will insert ===") + pprint(payload) + + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml) + VALUES + (%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s) + """ + + conn = pymysql.connect(**MYSQL_CFG) + try: + with conn.cursor() as cur: + cur.execute(sql, payload) + conn.commit() + finally: + conn.close() + + print("Inserted 1 row into vzp_stav_pojisteni.") + return 1 + +# ========== MAIN ========== +if __name__ == "__main__": + rc_norm = normalize_rc(RC) + body = f""" + + {rc_norm} + {K_DATU} +""".strip() + + xml = post_soap(EP_STAV, body) + parsed = parse_stav(xml) + print(parsed) # keep your quick print + + # Save to MySQL and print exactly what is saved + save_stav_pojisteni(RC, K_DATU, parsed, xml) diff --git a/02 testík.py b/02 testík.py new file mode 100644 index 0000000..700b29a --- /dev/null +++ b/02 testík.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), +parse the response, upsert rows into medevio.vzp_registrace, +and print what is being saved. +""" + +# pip install requests_pkcs12 pymysql + +from requests_pkcs12 import Pkcs12Adapter +import requests +import xml.etree.ElementTree as ET +from datetime import date +import pymysql +from pymysql.cursors import DictCursor +from pprint import pprint +from functions import get_medicus_connection +from functions import get_mysql_connection +import time, random,socket + +# ------------------- CONFIG ------------------- +ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive +PFX_PATH = r"mbcert.pfx" # <-- your .pfx path +PFX_PASS = "Vlado7309208104++" # <-- your export password +VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" + +# Patient + query +RC = "7309208104" # rodné číslo without slash +# RC = "280616/091" # rodné číslo without slash +K_DATU = date.today().isoformat() # YYYY-MM-DD +ODBORNOSTI = ["001"] # VPL (adult GP) + +# MySQL +if socket.gethostname().strip() == "NTBVBHP470G10": + MYSQL_CFG = dict( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) +elif socket.gethostname().strip() == "SESTRA": + MYSQL_CFG = dict( + host="127.0.0.1", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) + + +# Namespaces (from your response/WSDL) +NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", +} + + +# ------------------- HELPERS ------------------- +def normalize_rc(rc: str) -> str: + return rc.replace("/", "").strip() + +def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: + odb_xml = "".join([f"{kod}" for kod in odb_list]) + inner = f""" + + {rc} + {k_datu} + + {odb_xml} + +""".strip() + + return f""" + + + {inner} + +""" + +def parse_registrace(xml_text: str): + """ + Return (rows, stav_vyrizeni) where rows is a list of dicts + for each item in the response. + """ + root = ET.fromstring(xml_text) + items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) + out = [] + for it in items: + def g(tag, ctx=it): + el = ctx.find(f"rp:{tag}", NS) + return el.text.strip() if el is not None and el.text else None + + poj = it.find("rp:zdravotniPojistovna", NS) + poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None + poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None + + spec = it.find("rp:odbornost", NS) + odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None + odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None + + out.append(dict( + icz=g("ICZ"), + icp=g("ICP"), + nazev_icp=g("nazevICP"), + nazev_szz=g("nazevSZZ"), + poj_kod=poj_kod, + poj_zkratka=poj_zkr, + odbornost_kod=odb_kod, + odbornost_nazev=odb_naz, + datum_registrace=g("datumRegistrace"), + datum_zahajeni=g("datumZahajeni"), + datum_ukonceni=g("datumUkonceni"), + )) + + st = root.find(".//rp:stavVyrizeniPozadavku", NS) + stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None + return out, stav_vyrizeni + +def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int: + """ + Insert or update rows into medevio.vzp_registrace and print each payload. + Assumes the table already exists (as per your DDL). + """ + if not rows: + print("No rows found; nothing to save.") + return 0 + + sql = """ +INSERT INTO vzp_registrace + (rc, query_date, odbornost_kod, odbornost_nazev, + icz, icp, nazev_icp, nazev_szz, + poj_kod, poj_zkratka, + datum_registrace, datum_zahajeni, datum_ukonceni, + stav_vyrizeni, response_xml) +VALUES + (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, + %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, + %(poj_kod)s, %(poj_zkratka)s, + %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, + %(stav_vyrizeni)s, %(response_xml)s) +ON DUPLICATE KEY UPDATE + odbornost_nazev = VALUES(odbornost_nazev), + nazev_icp = VALUES(nazev_icp), + nazev_szz = VALUES(nazev_szz), + poj_kod = VALUES(poj_kod), + poj_zkratka = VALUES(poj_zkratka), + datum_registrace= VALUES(datum_registrace), + datum_zahajeni = VALUES(datum_zahajeni), + datum_ukonceni = VALUES(datum_ukonceni), + stav_vyrizeni = VALUES(stav_vyrizeni), + response_xml = VALUES(response_xml), + updated_at = CURRENT_TIMESTAMP +""" + rc_norm = normalize_rc(rc) + qd = query_date or date.today().isoformat() + payloads = [] + for r in rows: + payload = { + "rc": rc_norm, + "query_date": qd, + **r, + "stav_vyrizeni": stav_vyrizeni, + "response_xml": xml_text, + } + payloads.append(payload) + + # Print what we're going to save + print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") + for p in payloads: + pprint(p) + + conn = pymysql.connect(**MYSQL_CFG) + try: + with conn.cursor() as cur: + cur.executemany(sql, payloads) + conn.commit() + finally: + conn.close() + + print(f"\nUpserted rows: {len(payloads)}") + return len(payloads) + +def prepare_processed_rcs(): + consql=get_mysql_connection() + cursql=consql.cursor() + + sql=""" + WITH ranked AS ( + SELECT + vreg.*, + ROW_NUMBER() OVER ( + PARTITION BY rc + ORDER BY query_date DESC + ) AS rn + FROM vzp_registrace AS vreg + ) + SELECT rc + FROM ranked + WHERE rn = 1 + """ + + cursql.execute(sql) + rows=cursql.fetchall() + print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") + rc_set_vzp = {row["rc"] for row in rows} + return (rc_set_vzp) + +# ------------------- MAIN FLOW ------------------- +def main(): + + con = get_medicus_connection() + cur = con.cursor() + cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '9'") + # cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'") + + # Vytvor seznam rodnych cisel, která už máme + rc_set_vzp = prepare_processed_rcs() + + rows = cur.fetchall() + print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") + + for row in rows: + if row[0] in rc_set_vzp: + continue + else: + print(row[0], row[1], row[2]) + K_DATU = date.today().isoformat() # YYYY-MM-DD + ODBORNOSTI = ["001"] + RC=row[0] + + # Build SOAP envelope + envelope = build_envelope(RC, K_DATU, ODBORNOSTI) + + # mTLS session + session = requests.Session() + session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": "process", # Oracle composite usually expects this + } + + # Call service + resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), + headers=headers, timeout=30, verify=VERIFY) + print("HTTP:", resp.status_code) + + # (Optional) Uncomment to see raw XML + # print(resp.text) + + # Parse and save + rows, stav = parse_registrace(resp.text) + upsert_rows(RC, K_DATU, rows, stav, resp.text) + + time.sleep(random.uniform(1, 5)) + + + + + + + + +if __name__ == "__main__": + main() diff --git a/02.1 Test moje rodne cislo.py b/02.1 Test moje rodne cislo.py new file mode 100644 index 0000000..4d6269d --- /dev/null +++ b/02.1 Test moje rodne cislo.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), +parse the response, upsert rows into medevio.vzp_registrace, +and print what is being saved. +""" + +# pip install requests_pkcs12 pymysql + +from requests_pkcs12 import Pkcs12Adapter +import requests +import xml.etree.ElementTree as ET +from datetime import date +import pymysql +from pymysql.cursors import DictCursor +from pprint import pprint +from functions import get_medicus_connection +from functions import get_mysql_connection +import time, random,socket + +# ------------------- CONFIG ------------------- +ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive +PFX_PATH = r"mbcert.pfx" # <-- your .pfx path +PFX_PASS = "Vlado7309208104++" # <-- your export password +VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" + +# Patient + query +RC = "7309208104" # rodné číslo without slash +# RC = "280616/091" # rodné číslo without slash +K_DATU = date.today().isoformat() # YYYY-MM-DD +ODBORNOSTI = ["001"] # VPL (adult GP) + +# MySQL +if socket.gethostname().strip() == "NTBVBHP470G10": + MYSQL_CFG = dict( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) +elif socket.gethostname().strip() == "SESTRA": + MYSQL_CFG = dict( + host="127.0.0.1", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) + + +# Namespaces (from your response/WSDL) +NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", +} + + +# ------------------- HELPERS ------------------- +def normalize_rc(rc: str) -> str: + return rc.replace("/", "").strip() + + +# Ukázka XML – Požadavek: +# +# 801212855 +# +# 002 +# +# +def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: + odb_xml = "".join([f"{kod}" for kod in odb_list]) + inner = f""" + + {rc} + {k_datu} + + {odb_xml} + +""".strip() + + return f""" + + + {inner} + +""" + +def parse_registrace(xml_text: str): + """ + Return (rows, stav_vyrizeni) where rows is a list of dicts + for each item in the response. + """ + root = ET.fromstring(xml_text) + items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) + out = [] + for it in items: + def g(tag, ctx=it): + el = ctx.find(f"rp:{tag}", NS) + return el.text.strip() if el is not None and el.text else None + + poj = it.find("rp:zdravotniPojistovna", NS) + poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None + poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None + + spec = it.find("rp:odbornost", NS) + odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None + odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None + + out.append(dict( + icz=g("ICZ"), + icp=g("ICP"), + nazev_icp=g("nazevICP"), + nazev_szz=g("nazevSZZ"), + poj_kod=poj_kod, + poj_zkratka=poj_zkr, + odbornost_kod=odb_kod, + odbornost_nazev=odb_naz, + datum_registrace=g("datumRegistrace"), + datum_zahajeni=g("datumZahajeni"), + datum_ukonceni=g("datumUkonceni"), + )) + + st = root.find(".//rp:stavVyrizeniPozadavku", NS) + stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None + return out, stav_vyrizeni + +def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int: + """ + Insert or update rows into medevio.vzp_registrace and print each payload. + Assumes the table already exists (as per your DDL). + """ + if not rows: + print("No rows found; nothing to save.") + return 0 + + sql = """ +INSERT INTO vzp_registrace + (rc, query_date, odbornost_kod, odbornost_nazev, + icz, icp, nazev_icp, nazev_szz, + poj_kod, poj_zkratka, + datum_registrace, datum_zahajeni, datum_ukonceni, + stav_vyrizeni, response_xml) +VALUES + (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, + %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, + %(poj_kod)s, %(poj_zkratka)s, + %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, + %(stav_vyrizeni)s, %(response_xml)s) +ON DUPLICATE KEY UPDATE + odbornost_nazev = VALUES(odbornost_nazev), + nazev_icp = VALUES(nazev_icp), + nazev_szz = VALUES(nazev_szz), + poj_kod = VALUES(poj_kod), + poj_zkratka = VALUES(poj_zkratka), + datum_registrace= VALUES(datum_registrace), + datum_zahajeni = VALUES(datum_zahajeni), + datum_ukonceni = VALUES(datum_ukonceni), + stav_vyrizeni = VALUES(stav_vyrizeni), + response_xml = VALUES(response_xml), + updated_at = CURRENT_TIMESTAMP +""" + rc_norm = normalize_rc(rc) + qd = query_date or date.today().isoformat() + payloads = [] + for r in rows: + payload = { + "rc": rc_norm, + "query_date": qd, + **r, + "stav_vyrizeni": stav_vyrizeni, + "response_xml": xml_text, + } + payloads.append(payload) + + # Print what we're going to save + print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") + for p in payloads: + pprint(p) + + conn = pymysql.connect(**MYSQL_CFG) + try: + with conn.cursor() as cur: + cur.executemany(sql, payloads) + conn.commit() + finally: + conn.close() + + print(f"\nUpserted rows: {len(payloads)}") + return len(payloads) + +def prepare_processed_rcs(): + consql=get_mysql_connection() + cursql=consql.cursor() + + sql=""" + WITH ranked AS ( + SELECT + vreg.*, + ROW_NUMBER() OVER ( + PARTITION BY rc + ORDER BY query_date DESC + ) AS rn + FROM vzp_registrace AS vreg + ) + SELECT rc + FROM ranked + WHERE rn = 1 + """ + + cursql.execute(sql) + rows=cursql.fetchall() + print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") + rc_set_vzp = {row["rc"] for row in rows} + return (rc_set_vzp) + +# ------------------- MAIN FLOW ------------------- +def main(): + K_DATU = date.today().isoformat() # YYYY-MM-DD + ODBORNOSTI = ["001"] + RC='7309208104' + + # Build SOAP envelope + envelope = build_envelope(RC, K_DATU, ODBORNOSTI) + + # mTLS session + session = requests.Session() + session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": "process", # Oracle composite usually expects this + } + + # Call service + resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), + headers=headers, timeout=30, verify=VERIFY) + print("HTTP:", resp.status_code) + + # (Optional) Uncomment to see raw XML + print(resp.text) + + # Parse and save + rows, stav = parse_registrace(resp.text) + print(rows,stav) + + time.sleep(random.uniform(1, 5)) + + + + + + + + +if __name__ == "__main__": + main() diff --git a/02.2 Testík.py b/02.2 Testík.py new file mode 100644 index 0000000..51ed961 --- /dev/null +++ b/02.2 Testík.py @@ -0,0 +1,308 @@ +#kód bude vkládat i řádky pro pacienty bez registrovaného lékař v oboru 001#!/usr/bin/env python3 -*- coding: utf-8 -*- """ Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), parse the response, upsert rows into medevio.vzp_registrace, and print what is being saved. """ # pip install requests_pkcs12 pymysql from requests_pkcs12 import Pkcs12Adapter import requests import xml.etree.ElementTree as ET from datetime import date import pymysql from pymysql.cursors import DictCursor from pprint import pprint from functions import get_medicus_connection from functions import get_mysql_connection import time, random,socket # ------------------- CONFIG ------------------- ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive PFX_PATH = r"mbcert.pfx" # <-- your .pfx path PFX_PASS = "Vlado7309208104++" # <-- your export password VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" Patient + query RC = "7309208104" # rodné číslo without slash RC = "280616/091" # rodné +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), +parse the response, upsert rows into medevio.vzp_registrace, +and print what is being saved. +""" + +# pip install requests_pkcs12 pymysql + +from requests_pkcs12 import Pkcs12Adapter +import requests +import xml.etree.ElementTree as ET +from datetime import date +import pymysql +from pymysql.cursors import DictCursor +from pprint import pprint +from functions import get_medicus_connection +from functions import get_mysql_connection +import time, random,socket + +# ------------------- CONFIG ------------------- +ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive +PFX_PATH = r"mbcert.pfx" # <-- your .pfx path +PFX_PASS = "Vlado7309208104++" # <-- your export password +VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" + +# Patient + query +RC = "7309208104" # rodné číslo without slash +# RC = "280616/091" # rodné číslo without slash +K_DATU = date.today().isoformat() # YYYY-MM-DD +ODBORNOSTI = ["001"] # VPL (adult GP) + +# MySQL +if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): + MYSQL_CFG = dict( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) +elif socket.gethostname().strip() == "SESTRA": + MYSQL_CFG = dict( + host="127.0.0.1", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) + + +# Namespaces (from your response/WSDL) +NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", +} + + +# ------------------- HELPERS ------------------- +def normalize_rc(rc: str) -> str: + return rc.replace("/", "").strip() + +def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: + odb_xml = "".join([f"{kod}" for kod in odb_list]) + inner = f""" + + {rc} + {k_datu} + + {odb_xml} + +""".strip() + + return f""" + + + {inner} + +""" + +def parse_registrace(xml_text: str): + """ + Return (rows, stav_vyrizeni) where rows is a list of dicts + for each item in the response. + """ + root = ET.fromstring(xml_text) + items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) + out = [] + for it in items: + def g(tag, ctx=it): + el = ctx.find(f"rp:{tag}", NS) + return el.text.strip() if el is not None and el.text else None + + poj = it.find("rp:zdravotniPojistovna", NS) + poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None + poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None + + spec = it.find("rp:odbornost", NS) + odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None + odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None + + out.append(dict( + icz=g("ICZ"), + icp=g("ICP"), + nazev_icp=g("nazevICP"), + nazev_szz=g("nazevSZZ"), + poj_kod=poj_kod, + poj_zkratka=poj_zkr, + odbornost_kod=odb_kod, + odbornost_nazev=odb_naz, + datum_registrace=g("datumRegistrace"), + datum_zahajeni=g("datumZahajeni"), + datum_ukonceni=g("datumUkonceni"), + )) + + st = root.find(".//rp:stavVyrizeniPozadavku", NS) + stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None + return out, stav_vyrizeni + +def upsert_rows( + rc: str, + query_date: str, + rows: list[dict], + stav_vyrizeni: str, + xml_text: str, + requested_odb_list: list[str] | None = None, +) -> int: + """ + Insert/update medevio.vzp_registrace. + If no items are returned, insert placeholder row(s) + for each requested specialty (e.g., "001") so the negative result is visible. + """ + sql = """ +INSERT INTO vzp_registrace + (rc, query_date, odbornost_kod, odbornost_nazev, + icz, icp, nazev_icp, nazev_szz, + poj_kod, poj_zkratka, + datum_registrace, datum_zahajeni, datum_ukonceni, + stav_vyrizeni, response_xml) +VALUES + (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, + %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, + %(poj_kod)s, %(poj_zkratka)s, + %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, + %(stav_vyrizeni)s, %(response_xml)s) +ON DUPLICATE KEY UPDATE + odbornost_nazev = VALUES(odbornost_nazev), + nazev_icp = VALUES(nazev_icp), + nazev_szz = VALUES(nazev_szz), + poj_kod = VALUES(poj_kod), + poj_zkratka = VALUES(poj_zkratka), + datum_registrace= VALUES(datum_registrace), + datum_zahajeni = VALUES(datum_zahajeni), + datum_ukonceni = VALUES(datum_ukonceni), + stav_vyrizeni = VALUES(stav_vyrizeni), + response_xml = VALUES(response_xml), + updated_at = CURRENT_TIMESTAMP +""" + rc_norm = normalize_rc(rc) + qd = query_date or date.today().isoformat() + + payloads: list[dict] = [] + + if rows: + # Positive path: save what came from the API + for r in rows: + payloads.append({ + "rc": rc_norm, + "query_date": qd, + **r, + "stav_vyrizeni": stav_vyrizeni, + "response_xml": xml_text, + }) + else: + # Negative path: no registration items -> create placeholders + if not requested_odb_list: + requested_odb_list = ["001"] + for kod in requested_odb_list: + payloads.append({ + "rc": rc_norm, + "query_date": qd, + "odbornost_kod": kod, + "odbornost_nazev": None, + "icz": None, + "icp": None, + "nazev_icp": None, + "nazev_szz": None, + "poj_kod": None, + "poj_zkratka": None, + "datum_registrace": None, + "datum_zahajeni": None, + "datum_ukonceni": None, + # Keep what VZP said (e.g., 'X'), and raw XML for audit + "stav_vyrizeni": stav_vyrizeni, + "response_xml": xml_text, + }) + + # Print what we're going to save + print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") + for p in payloads: + pprint(p) + + if not payloads: + print("No payloads prepared (this should not happen).") + return 0 + + connsql = get_mysql_connection() + try: + with connsql.cursor() as cur: + cur.executemany(sql, payloads) + connsql.commit() + finally: + connsql.close() + + print(f"\nUpserted rows: {len(payloads)}") + return len(payloads) + + +def prepare_processed_rcs(): + consql=get_mysql_connection() + cursql=consql.cursor() + + sql=""" + WITH ranked AS ( + SELECT + vreg.*, + ROW_NUMBER() OVER ( + PARTITION BY rc + ORDER BY query_date DESC + ) AS rn + FROM vzp_registrace AS vreg + ) + SELECT rc + FROM ranked + WHERE rn = 1 + """ + + cursql.execute(sql) + rows=cursql.fetchall() + print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") + rc_set_vzp = {row["rc"] for row in rows} + return (rc_set_vzp) + +# ------------------- MAIN FLOW ------------------- +def main(): + + con = get_medicus_connection() + cur = con.cursor() + cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '1'") + # cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'") + + # Vytvor seznam rodnych cisel, která už máme + rc_set_vzp = prepare_processed_rcs() + + rows = cur.fetchall() + print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") + + for row in rows: + if row[0] in rc_set_vzp: + continue + else: + print(row[0], row[1], row[2]) + K_DATU = date.today().isoformat() # YYYY-MM-DD + ODBORNOSTI = ["001"] + RC=row[0] + + # Build SOAP envelope + envelope = build_envelope(RC, K_DATU, ODBORNOSTI) + + # mTLS session + session = requests.Session() + session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": "process", # Oracle composite usually expects this + } + print(envelope) + # Call service + resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), + headers=headers, timeout=30, verify=VERIFY) + print("HTTP:", resp.status_code) + + # (Optional) Uncomment to see raw XML + print(resp.text) + + # Parse and save + rows, stav = parse_registrace(resp.text) + print(rows,stav) + upsert_rows(RC, K_DATU, rows, stav, resp.text, requested_odb_list=ODBORNOSTI) + + time.sleep(random.uniform(1, 5)) + + + + + + + + +if __name__ == "__main__": + main() diff --git a/03 VyberKontrolaStavuPojisteni.py b/03 VyberKontrolaStavuPojisteni.py new file mode 100644 index 0000000..41b6558 --- /dev/null +++ b/03 VyberKontrolaStavuPojisteni.py @@ -0,0 +1,46 @@ +from functions import get_medicus_connection +from functions import get_mysql_connection +from functions import check_insurance +import time, random + +def prepare_processed_rcs(): + consql=get_mysql_connection() + cursql=consql.cursor() + + sql=""" + WITH ranked AS ( + SELECT + vr.*, + ROW_NUMBER() OVER ( + PARTITION BY rc + ORDER BY k_datu DESC, queried_at DESC + ) AS rn + FROM vzp_stav_pojisteni AS vr + ) + SELECT rc + FROM ranked + WHERE rn = 1 + """ + + cursql.execute(sql) + rows=cursql.fetchall() + print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") + rc_set_vzp = {row["rc"] for row in rows} + return (rc_set_vzp) + +con=get_medicus_connection() +cur=con.cursor() +cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '3'") + +rc_set_vzp=prepare_processed_rcs() + +rows=cur.fetchall() +print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") + +for row in rows: + if row[0] in rc_set_vzp: + continue + else: + print(row[0], row[1],row[2]) + print(check_insurance(row[0])) + time.sleep(random.uniform(1, 5)) diff --git a/04 testik.py b/04 testik.py new file mode 100644 index 0000000..b2d20d6 --- /dev/null +++ b/04 testik.py @@ -0,0 +1,7 @@ +from functions import check_insurance + +if __name__ == "__main__": + rc = "7309208104" + res = check_insurance(rc) + print("=== RESULT ===") + print(res) \ No newline at end of file diff --git a/05 testik.py b/05 testik.py new file mode 100644 index 0000000..48cda59 --- /dev/null +++ b/05 testik.py @@ -0,0 +1,4 @@ +import socket + +computer_name = socket.gethostname() +print(computer_name) \ No newline at end of file diff --git a/10 Tests/MBcert.pfx b/10 Tests/MBcert.pfx new file mode 100644 index 0000000..6fbd274 Binary files /dev/null and b/10 Tests/MBcert.pfx differ diff --git a/10 Tests/RozdilnalezenimeziFDBaMySQL.py b/10 Tests/RozdilnalezenimeziFDBaMySQL.py new file mode 100644 index 0000000..018df59 --- /dev/null +++ b/10 Tests/RozdilnalezenimeziFDBaMySQL.py @@ -0,0 +1,67 @@ +import pandas as pd +import fdb +import pymysql + +# --------------------------------- +# FIREBIRD CONNECTION +# --------------------------------- +fb = fdb.connect( + host="192.168.1.4", + database=r"z:\Medicus 3\data\MEDICUS.FDB", + user="SYSDBA", + password="masterkey", + charset="WIN1250" +) +cur = fb.cursor() + +sql_fb = """ +SELECT kar.rodcis +FROM registr +JOIN kar ON registr.idpac = kar.idpac +WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') +""" + +cur.execute(sql_fb) +rows_fb = cur.fetchall() + +df_fb = pd.DataFrame(rows_fb, columns=["rc"]) +print("FB count:", len(df_fb)) + +# --------------------------------- +# MYSQL CONNECTION +# --------------------------------- +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4" +) + +sql_mysql = """ +SELECT rc +FROM vzp_stav_pojisteni AS v +WHERE v.k_datu = CURDATE() + AND v.id = ( + SELECT MAX(id) + FROM vzp_stav_pojisteni + WHERE rc = v.rc + AND k_datu = CURDATE() + ); +""" + +df_mysql = pd.read_sql(sql_mysql, mysql) +print("MySQL count:", len(df_mysql)) + +# --------------------------------- +# FIND MISSING RC +# --------------------------------- +df_missing = df_fb[~df_fb["rc"].isin(df_mysql["rc"])] + +print("\nMissing patients:") +print(df_missing) + +fb.close() +mysql.close() diff --git a/10 Tests/medicus_db.py b/10 Tests/medicus_db.py new file mode 100644 index 0000000..b8de965 --- /dev/null +++ b/10 Tests/medicus_db.py @@ -0,0 +1,42 @@ +import fdb + + +class MedicusDB: + + def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): + self.conn = fdb.connect( + host=host, + database=db_path, + user=user, + password=password, + charset=charset + ) + self.cur = self.conn.cursor() + + def query(self, sql, params=None): + self.cur.execute(sql, params or ()) + return self.cur.fetchall() + + def query_dict(self, sql, params=None): + self.cur.execute(sql, params or ()) + cols = [d[0].strip().lower() for d in self.cur.description] + return [dict(zip(cols, row)) for row in self.cur.fetchall()] + + def get_active_registered_patients(self): + sql = """ + SELECT + kar.rodcis, + kar.prijmeni, + kar.jmeno, + kar.poj + FROM registr + JOIN kar ON registr.idpac = kar.idpac + WHERE registr.datum_zruseni IS NULL + AND registr.priznak IN ('A','D','V') + AND kar.rodcis IS NOT NULL + AND kar.rodcis <> '' + """ + return self.query(sql) # or self.query_dict(sql) + + def close(self): + self.conn.close() diff --git a/10 Tests/rozdíl.py b/10 Tests/rozdíl.py new file mode 100644 index 0000000..0db0af4 --- /dev/null +++ b/10 Tests/rozdíl.py @@ -0,0 +1,79 @@ +import pandas as pd +import pymysql +from medicus_db import MedicusDB +import fdb + + +# FULL OUTPUT SETTINGS +pd.set_option("display.max_rows", None) +pd.set_option("display.max_columns", None) +pd.set_option("display.width", 0) +pd.set_option("display.max_colwidth", None) + + +# =========== +# +# =========================== +# FIREBIRD → načtení registrovaných pacientů +# ====================================== +db = MedicusDB("192.168.1.4", r"z:\Medicus 3\data\MEDICUS.FDB") +rows_fb = db.get_active_registered_patients() # vrací rc, prijmeni, jmeno, poj +db.close() + +df_fb = pd.DataFrame(rows_fb, columns=["rc", "prijmeni", "jmeno", "poj_medicus"]) +df_fb["poj_medicus"] = df_fb["poj_medicus"].astype(str).str.strip() + +print("FB count:", len(df_fb)) + + +# ====================================== +# MYSQL → načtení dnešních výsledků +# ====================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4" +) + +sql_mysql = """ +SELECT rc, + kod_pojistovny AS poj_mysql, + nazev_pojistovny, + stav, + stav_vyrizeni +FROM vzp_stav_pojisteni AS v +WHERE v.k_datu = CURDATE() + AND v.id = ( + SELECT MAX(id) + FROM vzp_stav_pojisteni + WHERE rc = v.rc + AND k_datu = CURDATE() + ); +""" + +df_mysql = pd.read_sql(sql_mysql, mysql) +df_mysql["poj_mysql"] = df_mysql["poj_mysql"].astype(str).str.strip() + +print("MySQL count:", len(df_mysql)) + + +# ====================================== +# LEFT JOIN: Medicus ↔ MySQL podle RC +# ====================================== +df_merge = df_fb.merge(df_mysql, on="rc", how="left") + + +# ====================================== +# Najít rozdíly pojišťovny +# ====================================== +df_diff = df_merge[df_merge["poj_medicus"] != df_merge["poj_mysql"]] + +print("\nPacienti s rozdílnou pojišťovnou:") +print(df_diff[["rc", "prijmeni", "jmeno", "poj_medicus", "poj_mysql", "nazev_pojistovny"]]) + +# Pokud chceš uložit do Excelu: +# df_diff.to_excel("rozdil_pojistoven.xlsx", index=False) + diff --git a/10 Tests/test kontrola registrovaných.py b/10 Tests/test kontrola registrovaných.py new file mode 100644 index 0000000..ff7382e --- /dev/null +++ b/10 Tests/test kontrola registrovaných.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import logging +from medicus_db import MedicusDB +from vzpb2b_client import VZPB2BClient +import pymysql +from datetime import date + +# ========================================== +# LOGGING SETUP +# ========================================== +logging.basicConfig( + filename="insurance_check.log", + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + encoding="utf-8" +) + +console = logging.getLogger("console") +console.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("%(message)s")) +console.addHandler(handler) + +def log_info(msg): + logging.info(msg) + console.info(msg) + +def log_error(msg): + logging.error(msg) + console.error(msg) + +# ========================================== +# MYSQL CONNECTION +# ========================================== +mysql = pymysql.connect( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + charset="utf8mb4", + autocommit=True +) + +def save_insurance_status(mysql_conn, rc, k_datu, result, xml_text): + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, + pojisteni_kod, stav_vyrizeni, response_xml) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + try: + with mysql_conn.cursor() as cur: + cur.execute(sql, ( + rc, + k_datu, + result["stav"], + result["kodPojistovny"], + result["nazevPojistovny"], + result["pojisteniKod"], + result["stavVyrizeni"], + xml_text + )) + except Exception as e: + log_error(f"❌ MYSQL ERROR for RC {rc}: {e}") + log_error(f"---- RAW XML ----\n{xml_text}\n-----------------") + raise + + +# ========================================== +# CONFIGURATION +# ========================================== +HOST = "192.168.1.4" +DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB" + +PFX_PATH = r"MBcert.pfx" +PFX_PASSWORD = "Vlado7309208104++" + +ENV = "prod" +ICZ = "00000000" +DIC = "00000000" + +# ========================================== +# INIT CONNECTIONS +# ========================================== +db = MedicusDB(HOST, DB_PATH) +vzp = VZPB2BClient(ENV, PFX_PATH, PFX_PASSWORD, icz=ICZ, dic=DIC) + +# ========================================== +# FETCH REGISTERED PATIENTS +# ========================================== +patients = db.get_active_registered_patients() +log_info(f"Checking {len(patients)} registered patients...\n") + +k_datu = date.today().isoformat() + +# ========================================== +# LOOP ONE PATIENT PER SECOND +# ========================================== +for rodcis, prijmeni, jmeno, poj in patients: + + log_info(f"=== Checking {prijmeni} {jmeno} ({rodcis}) ===") + + xml = vzp.stav_pojisteni(rc=rodcis, k_datu=k_datu) + + # 1) Check if response looks like XML + if not xml.strip().startswith("<"): + log_error(f"❌ INVALID XML for RC {rodcis}") + log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") + time.sleep(2) + continue + + # 2) Try parsing XML + try: + result = vzp.parse_stav_pojisteni(xml) + except Exception as e: + log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}") + log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") + time.sleep(2) + continue + + log_info(f"Result: {result}") + + # 3) Save into MySQL (with logging) + try: + save_insurance_status(mysql, rodcis, k_datu, result, xml) + except Exception: + log_error(f"❌ FAILURE inserting to MySQL for {rodcis}") + continue + + time.sleep(2) + +db.close() +log_info("\nDONE.") diff --git a/10 Tests/test stavpojisteni.py b/10 Tests/test stavpojisteni.py new file mode 100644 index 0000000..9d3a323 --- /dev/null +++ b/10 Tests/test stavpojisteni.py @@ -0,0 +1,19 @@ +cur.execute("select rodcis,prijmeni,jmeno from kar where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc") + + + + + +from vzpb2b_client import VZPB2BClient + +client = VZPB2BClient( + env="production", + pfx_path="mbcert.pfx", + pfx_password="Vlado7309208104++", + icz="00000000", + dic="00000000" +) + +response = client.stav_pojisteni("0308020152") +# print(response) +print(client.parse_stav_pojisteni(response)) \ No newline at end of file diff --git a/10 Tests/test.py b/10 Tests/test.py new file mode 100644 index 0000000..70f2d92 --- /dev/null +++ b/10 Tests/test.py @@ -0,0 +1,12 @@ +from vzpb2b_client import VZPB2BClient + +client = VZPB2BClient( + env="simu", # or "prod" + pfx_path="mbcert.pfx", + pfx_password="Vlado7309208104++", + icz="00000000", + dic="00000000" +) + +result_xml = client.over_prukaz_pojistence("80203111194350000001") +print(result_xml) diff --git a/10 Tests/vzpb2b_client.py b/10 Tests/vzpb2b_client.py new file mode 100644 index 0000000..a8ea67f --- /dev/null +++ b/10 Tests/vzpb2b_client.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from requests_pkcs12 import Pkcs12Adapter +import requests +import uuid +from datetime import date + + + +class VZPB2BClient: + def __init__(self, env: str, pfx_path: str, pfx_password: str, + icz: str = "00000000", dic: str = "00000000"): + + # Normalize environment name + env = env.lower().strip() + + if env in ("prod", "production", "live", "real"): + self.env = "prod" + elif env in ("simu", "simulace", "test", "testing"): + self.env = "simu" + else: + raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.") + + self.pfx_path = pfx_path + self.pfx_password = pfx_password + self.icz = icz + self.dic = dic + + # Prepare mTLS session + session = requests.Session() + session.mount( + "https://", + Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password) + ) + self.session = session + + # -------------------------------------------------------------- + # URL BUILDER + # -------------------------------------------------------------- + def _build_endpoint(self, service_name: str) -> str: + """ + SIMU: + https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU?sluzba=SIMU + PROD: + https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/ + """ + + if self.env == "simu": + simu_service = f"SIMU{service_name}" + return ( + f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/" + f"{simu_service}?sluzba={simu_service}" + ) + + # Production + return ( + f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}" + ) + + # -------------------------------------------------------------- + # SOAP HEADER BUILDER + # -------------------------------------------------------------- + def _header(self) -> str: + idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars + return f""" + {idZpravy} + + {self.icz} + {self.dic} + + """ + + # -------------------------------------------------------------- + # OVERPRUKAZ — EHIC CHECK + # -------------------------------------------------------------- + def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str: + """ + Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env). + Returns raw XML string. + """ + + service = "OverPrukazPojistenceB2B" + endpoint = self._build_endpoint(service) + + if not k_datu: + k_datu = date.today().isoformat() + + soap = f""" + + + + {self._header()} + + + + + {cislo_prukazu} + {k_datu} + + + + +""" + + headers = {"Content-Type": "text/xml; charset=utf-8"} + + print(f"Calling: {endpoint}") + response = self.session.post( + endpoint, + data=soap.encode("utf-8"), + headers=headers, + timeout=30 + ) + print("HTTP:", response.status_code) + return response.text + + def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None): + """ + Calls stavPojisteniB2B (SIMU or PROD). + """ + service = "stavPojisteniB2B" + endpoint = self._build_endpoint(service) + + if not k_datu: + k_datu = date.today().isoformat() + + prijmeni_xml = f"{prijmeni}" if prijmeni else "" + + soap = f""" + + + + {self._header()} + + + + + {rc} + {prijmeni_xml} + {k_datu} + + + + +""" + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": "process" + } + + print(f"Calling: {endpoint}") + resp = self.session.post(endpoint, data=soap.encode("utf-8"), + headers=headers, timeout=30) + print("HTTP:", resp.status_code) + return resp.text + + def parse_stav_pojisteni(self, xml_text: str): + """ + Parses stavPojisteniB2B SOAP response into a Python dict. + + Returned structure: + { + "stavVyrizeni": int, + "stav": str | None, + "kodPojistovny": str | None, + "nazevPojistovny": str | None, + "pojisteniKod": str | None + } + """ + + import xml.etree.ElementTree as ET + + NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B" + } + + root = ET.fromstring(xml_text) + + # ---- Extract status ---- + stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS) + stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None + + # ---- If no stavPojisteni element present (e.g. 0 or some errors) ---- + node_stav = root.find(".//vzp:stavPojisteni", NS) + if node_stav is None: + return { + "stavVyrizeni": stav_vyr, + "stav": None, + "kodPojistovny": None, + "nazevPojistovny": None, + "pojisteniKod": None, + } + + def get(tag): + el = node_stav.find(f"vzp:{tag}", NS) + return el.text.strip() if el is not None and el.text else None + + return { + "stavVyrizeni": stav_vyr, + "stav": get("stav"), + "kodPojistovny": get("kodPojistovny"), + "nazevPojistovny": get("nazevPojistovny"), + "pojisteniKod": get("pojisteniKod"), + } diff --git a/File3.py b/File3.py new file mode 100644 index 0000000..5b772f2 --- /dev/null +++ b/File3.py @@ -0,0 +1,24 @@ +from functions import check_insurance +import time +import fdb + + +# MEDICUS local CFG (table already created as per previous DDL) +MEDICUS_CFG = dict( + dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb", + user="SYSDBA", + password="masterkey", + charset="win1250" +) +conn=fdb.connect(**MEDICUS_CFG) + +cur=conn.cursor() +cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '8'") + +rows=cur.fetchall() +print(len(rows)) + +for row in rows: + print(row[0], row[1],row[2]) + print(check_insurance(row[0])) + time.sleep(.25) \ No newline at end of file diff --git a/MBcert.pfx b/MBcert.pfx new file mode 100644 index 0000000..6fbd274 Binary files /dev/null and b/MBcert.pfx differ diff --git a/functions.py b/functions.py new file mode 100644 index 0000000..6d72756 --- /dev/null +++ b/functions.py @@ -0,0 +1,194 @@ +# functions.py +# pip install requests_pkcs12 pymysql +from __future__ import annotations + +import os +from datetime import date +from pprint import pprint +import xml.etree.ElementTree as ET + +import requests +from requests_pkcs12 import Pkcs12Adapter +import pymysql +from pymysql.cursors import DictCursor +import fdb + + +import socket +def get_medicus_connection(): + """ + Attempt to create a Firebird connection to the Medicus database. + Returns: + fdb.Connection object on success + None on failure + """ + if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): + MEDICUS_CFG = dict( + dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb", + user="SYSDBA", + password="masterkey", + charset="win1250", + ) + elif socket.gethostname().strip()=="SESTRA": + MEDICUS_CFG = dict( + dsn=r"192.168.1.10:m:\medicus\data\medicus.fdb", + user="SYSDBA", + password="masterkey", + charset="win1250", + ) + + try: + return fdb.connect(**MEDICUS_CFG) + except fdb.fbcore.DatabaseError as e: + print(f"Medicus DB connection failed: {e}") + return None + + + + +def get_mysql_connection(): + """ + Return a PyMySQL connection or None if the connection fails. + """ + if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): + MYSQL_CFG = dict( + host="192.168.1.76", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) + elif socket.gethostname().strip() == "SESTRA": + MYSQL_CFG = dict( + host="127.0.0.1", + port=3307, + user="root", + password="Vlado9674+", + database="medevio", + cursorclass=DictCursor, + autocommit=True, # or False if you prefer manual commit + ) + try: + return pymysql.connect(**MYSQL_CFG) + except pymysql.MySQLError as e: + print(f"MySQL connection failed: {e}") + return None + +# ======== CONFIG (env overrides allowed) ======== +PFX_PATH = os.getenv("VZP_PFX_PATH", r"mbcert.pfx") +PFX_PASS = os.getenv("VZP_PFX_PASS", "Vlado7309208104++") +VERIFY = os.getenv("VZP_CA_VERIFY", "true").lower() != "false" # set to path or "false" to disable + +EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B" +SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/" +NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B" + + + +# ======== HELPERS ======== +def normalize_rc(rc: str) -> str: + """Return rodné číslo without slash/spaces.""" + return rc.replace("/", "").replace(" ", "").strip() + +def to_int_or_none(val): + if val is None: + return None + s = str(val).strip() + return int(s) if s.isdigit() else None + +def _session(): + s = requests.Session() + s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) + return s + +def post_soap(endpoint: str, inner_xml: str) -> str: + """Wrap body in SOAP 1.1 envelope and POST with mTLS.""" + envelope = f''' + + {inner_xml} +''' + r = _session().post( + endpoint, + data=envelope.encode("utf-8"), + headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"}, + timeout=30, + verify=VERIFY, + ) + # You may log r.status_code if needed. + return r.text + +def parse_stav(xml_text: str) -> dict: + """Parse StavPojisteniB2B response → dict with keys: + 'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'.""" + NS = {"soap": SOAP_NS, "s": NS_STAV} + root = ET.fromstring(xml_text) + out = {"stav": None, "kodPojistovny": None, "nazevPojistovny": None, + "pojisteniKod": None, "stavVyrizeniPozadavku": None} + node = root.find(".//s:stavPojisteni", NS) + if node is not None: + for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"): + el = node.find(f"s:{tag}", NS) + out[tag] = el.text.strip() if (el is not None and el.text) else None + st = root.find(".//s:stavVyrizeniPozadavku", NS) + out["stavVyrizeniPozadavku"] = st.text.strip() if (st is not None and st.text) else None + return out + +def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> None: + """Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved.""" + payload = { + "rc": normalize_rc(rc), + "k_datu": k_datu, + "stav": to_int_or_none(parsed.get("stav")), # 'X' -> None + "kod_pojistovny": parsed.get("kodPojistovny"), + "nazev_pojistovny": parsed.get("nazevPojistovny"), + "pojisteni_kod": parsed.get("pojisteniKod"), + "stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")), + "response_xml": response_xml, + } + + print("\n=== vzp_stav_pojisteni: will insert ===") + pprint(payload) + + sql = """ + INSERT INTO vzp_stav_pojisteni + (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml) + VALUES + (%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s) + """ + conn = get_mysql_connection() + try: + with conn.cursor() as cur: + cur.execute(sql, payload) + conn.commit() + finally: + conn.close() + print("Inserted 1 row into vzp_stav_pojisteni.") + +# ======== PUBLIC API ======== +def check_insurance(rc: str, mode: str | None = None, k_datu: str | None = None): + """ + Call VZP 'StavPojisteniB2B' for given RC and date. + Returns (is_active, info_dict). + - is_active: True iff info_dict['stav'] == '1' + - info_dict: {'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'} + If mode == 'nodb' (case-insensitive), DOES NOT write to DB. Otherwise logs into medevio.vzp_stav_pojisteni. + """ + rc_norm = normalize_rc(rc) + kd = k_datu or date.today().isoformat() + + body = f""" + + {rc_norm} + {kd} +""".strip() + + xml = post_soap(EP_STAV, body) + info = parse_stav(xml) + is_active = (info.get("stav") == "1") + + if not (mode and mode.lower() == "nodb"): + save_stav_pojisteni(rc_norm, kd, info, xml) + + return is_active, info diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..282c772 Binary files /dev/null and b/requirements.txt differ