diff --git a/01 testik.py b/01 testik.py deleted file mode 100644 index 8c333e1..0000000 --- a/01 testik.py +++ /dev/null @@ -1,132 +0,0 @@ -# pip install requests_pkcs12 pymysql -from requests_pkcs12 import Pkcs12Adapter -import requests -import xml.etree.ElementTree as ET -from datetime import date -import pymysql -from pymysql.cursors import DictCursor -from pprint import pprint - -# ========== CONFIG ========== -PFX_PATH = r"mbcert.pfx" -PFX_PASS = "Vlado7309208104++" -VERIFY = True - -EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B" -SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/" -NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B" - -# RC provided by you (can contain slash; we normalize before sending/saving) -RC = "7309208104" -# RC = "280616/091" -K_DATU = date.today().isoformat() - -# MySQL (table already created as per previous DDL) -MYSQL_CFG = dict( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=False, -) - -# ========== HELPERS ========== -def to_int_or_none(val): - if val is None: - return None - s = str(val).strip() - return int(s) if s.isdigit() else None - -def normalize_rc(rc: str) -> str: - return rc.replace("/", "").strip() - -def post_soap(endpoint: str, body_xml: str) -> str: - env = f''' - - {body_xml} -''' - s = requests.Session() - s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) - r = s.post( - endpoint, - data=env.encode("utf-8"), - headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"}, - timeout=30, - verify=VERIFY, - ) - print("HTTP:", r.status_code) - return r.text - -def parse_stav(xml_text: str): - NS = {"soap": SOAP_NS, "s": NS_STAV} - root = ET.fromstring(xml_text) - out = { - "stav": None, - "kodPojistovny": None, - "nazevPojistovny": None, - "pojisteniKod": None, - "stavVyrizeniPozadavku": None, - } - node = root.find(".//s:stavPojisteni", NS) - if node is not None: - for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"): - el = node.find(f"s:{tag}", NS) - out[tag] = el.text.strip() if el is not None and el.text else None - st = root.find(".//s:stavVyrizeniPozadavku", NS) - out["stavVyrizeniPozadavku"] = st.text.strip() if st is not None and st.text else None - return out - -def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> int: - """ - Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved. - Assumes the table already exists (no DDL/verification here). - """ - payload = { - "rc": normalize_rc(rc), - "k_datu": k_datu, - "stav": to_int_or_none(parsed.get("stav")), # handles 'X' -> None - "kod_pojistovny": parsed.get("kodPojistovny"), - "nazev_pojistovny": parsed.get("nazevPojistovny"), - "pojisteni_kod": parsed.get("pojisteniKod"), - "stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")), - "response_xml": response_xml, - } - - print("\n=== vzp_stav_pojisteni: will insert ===") - pprint(payload) - - sql = """ - INSERT INTO vzp_stav_pojisteni - (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml) - VALUES - (%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s) - """ - - conn = pymysql.connect(**MYSQL_CFG) - try: - with conn.cursor() as cur: - cur.execute(sql, payload) - conn.commit() - finally: - conn.close() - - print("Inserted 1 row into vzp_stav_pojisteni.") - return 1 - -# ========== MAIN ========== -if __name__ == "__main__": - rc_norm = normalize_rc(RC) - body = f""" - - {rc_norm} - {K_DATU} -""".strip() - - xml = post_soap(EP_STAV, body) - parsed = parse_stav(xml) - print(parsed) # keep your quick print - - # Save to MySQL and print exactly what is saved - save_stav_pojisteni(RC, K_DATU, parsed, xml) diff --git a/02 testík.py b/02 testík.py deleted file mode 100644 index 700b29a..0000000 --- a/02 testík.py +++ /dev/null @@ -1,271 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), -parse the response, upsert rows into medevio.vzp_registrace, -and print what is being saved. -""" - -# pip install requests_pkcs12 pymysql - -from requests_pkcs12 import Pkcs12Adapter -import requests -import xml.etree.ElementTree as ET -from datetime import date -import pymysql -from pymysql.cursors import DictCursor -from pprint import pprint -from functions import get_medicus_connection -from functions import get_mysql_connection -import time, random,socket - -# ------------------- CONFIG ------------------- -ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive -PFX_PATH = r"mbcert.pfx" # <-- your .pfx path -PFX_PASS = "Vlado7309208104++" # <-- your export password -VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" - -# Patient + query -RC = "7309208104" # rodné číslo without slash -# RC = "280616/091" # rodné číslo without slash -K_DATU = date.today().isoformat() # YYYY-MM-DD -ODBORNOSTI = ["001"] # VPL (adult GP) - -# MySQL -if socket.gethostname().strip() == "NTBVBHP470G10": - MYSQL_CFG = dict( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) -elif socket.gethostname().strip() == "SESTRA": - MYSQL_CFG = dict( - host="127.0.0.1", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) - - -# Namespaces (from your response/WSDL) -NS = { - "soap": "http://schemas.xmlsoap.org/soap/envelope/", - "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", -} - - -# ------------------- HELPERS ------------------- -def normalize_rc(rc: str) -> str: - return rc.replace("/", "").strip() - -def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: - odb_xml = "".join([f"{kod}" for kod in odb_list]) - inner = f""" - - {rc} - {k_datu} - - {odb_xml} - -""".strip() - - return f""" - - - {inner} - -""" - -def parse_registrace(xml_text: str): - """ - Return (rows, stav_vyrizeni) where rows is a list of dicts - for each item in the response. - """ - root = ET.fromstring(xml_text) - items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) - out = [] - for it in items: - def g(tag, ctx=it): - el = ctx.find(f"rp:{tag}", NS) - return el.text.strip() if el is not None and el.text else None - - poj = it.find("rp:zdravotniPojistovna", NS) - poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None - poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None - - spec = it.find("rp:odbornost", NS) - odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None - odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None - - out.append(dict( - icz=g("ICZ"), - icp=g("ICP"), - nazev_icp=g("nazevICP"), - nazev_szz=g("nazevSZZ"), - poj_kod=poj_kod, - poj_zkratka=poj_zkr, - odbornost_kod=odb_kod, - odbornost_nazev=odb_naz, - datum_registrace=g("datumRegistrace"), - datum_zahajeni=g("datumZahajeni"), - datum_ukonceni=g("datumUkonceni"), - )) - - st = root.find(".//rp:stavVyrizeniPozadavku", NS) - stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None - return out, stav_vyrizeni - -def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int: - """ - Insert or update rows into medevio.vzp_registrace and print each payload. - Assumes the table already exists (as per your DDL). - """ - if not rows: - print("No rows found; nothing to save.") - return 0 - - sql = """ -INSERT INTO vzp_registrace - (rc, query_date, odbornost_kod, odbornost_nazev, - icz, icp, nazev_icp, nazev_szz, - poj_kod, poj_zkratka, - datum_registrace, datum_zahajeni, datum_ukonceni, - stav_vyrizeni, response_xml) -VALUES - (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, - %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, - %(poj_kod)s, %(poj_zkratka)s, - %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, - %(stav_vyrizeni)s, %(response_xml)s) -ON DUPLICATE KEY UPDATE - odbornost_nazev = VALUES(odbornost_nazev), - nazev_icp = VALUES(nazev_icp), - nazev_szz = VALUES(nazev_szz), - poj_kod = VALUES(poj_kod), - poj_zkratka = VALUES(poj_zkratka), - datum_registrace= VALUES(datum_registrace), - datum_zahajeni = VALUES(datum_zahajeni), - datum_ukonceni = VALUES(datum_ukonceni), - stav_vyrizeni = VALUES(stav_vyrizeni), - response_xml = VALUES(response_xml), - updated_at = CURRENT_TIMESTAMP -""" - rc_norm = normalize_rc(rc) - qd = query_date or date.today().isoformat() - payloads = [] - for r in rows: - payload = { - "rc": rc_norm, - "query_date": qd, - **r, - "stav_vyrizeni": stav_vyrizeni, - "response_xml": xml_text, - } - payloads.append(payload) - - # Print what we're going to save - print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") - for p in payloads: - pprint(p) - - conn = pymysql.connect(**MYSQL_CFG) - try: - with conn.cursor() as cur: - cur.executemany(sql, payloads) - conn.commit() - finally: - conn.close() - - print(f"\nUpserted rows: {len(payloads)}") - return len(payloads) - -def prepare_processed_rcs(): - consql=get_mysql_connection() - cursql=consql.cursor() - - sql=""" - WITH ranked AS ( - SELECT - vreg.*, - ROW_NUMBER() OVER ( - PARTITION BY rc - ORDER BY query_date DESC - ) AS rn - FROM vzp_registrace AS vreg - ) - SELECT rc - FROM ranked - WHERE rn = 1 - """ - - cursql.execute(sql) - rows=cursql.fetchall() - print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") - rc_set_vzp = {row["rc"] for row in rows} - return (rc_set_vzp) - -# ------------------- MAIN FLOW ------------------- -def main(): - - con = get_medicus_connection() - cur = con.cursor() - cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '9'") - # cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'") - - # Vytvor seznam rodnych cisel, která už máme - rc_set_vzp = prepare_processed_rcs() - - rows = cur.fetchall() - print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") - - for row in rows: - if row[0] in rc_set_vzp: - continue - else: - print(row[0], row[1], row[2]) - K_DATU = date.today().isoformat() # YYYY-MM-DD - ODBORNOSTI = ["001"] - RC=row[0] - - # Build SOAP envelope - envelope = build_envelope(RC, K_DATU, ODBORNOSTI) - - # mTLS session - session = requests.Session() - session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) - - headers = { - "Content-Type": "text/xml; charset=utf-8", - "SOAPAction": "process", # Oracle composite usually expects this - } - - # Call service - resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), - headers=headers, timeout=30, verify=VERIFY) - print("HTTP:", resp.status_code) - - # (Optional) Uncomment to see raw XML - # print(resp.text) - - # Parse and save - rows, stav = parse_registrace(resp.text) - upsert_rows(RC, K_DATU, rows, stav, resp.text) - - time.sleep(random.uniform(1, 5)) - - - - - - - - -if __name__ == "__main__": - main() diff --git a/02.1 Test moje rodne cislo.py b/02.1 Test moje rodne cislo.py deleted file mode 100644 index 4d6269d..0000000 --- a/02.1 Test moje rodne cislo.py +++ /dev/null @@ -1,262 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), -parse the response, upsert rows into medevio.vzp_registrace, -and print what is being saved. -""" - -# pip install requests_pkcs12 pymysql - -from requests_pkcs12 import Pkcs12Adapter -import requests -import xml.etree.ElementTree as ET -from datetime import date -import pymysql -from pymysql.cursors import DictCursor -from pprint import pprint -from functions import get_medicus_connection -from functions import get_mysql_connection -import time, random,socket - -# ------------------- CONFIG ------------------- -ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive -PFX_PATH = r"mbcert.pfx" # <-- your .pfx path -PFX_PASS = "Vlado7309208104++" # <-- your export password -VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" - -# Patient + query -RC = "7309208104" # rodné číslo without slash -# RC = "280616/091" # rodné číslo without slash -K_DATU = date.today().isoformat() # YYYY-MM-DD -ODBORNOSTI = ["001"] # VPL (adult GP) - -# MySQL -if socket.gethostname().strip() == "NTBVBHP470G10": - MYSQL_CFG = dict( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) -elif socket.gethostname().strip() == "SESTRA": - MYSQL_CFG = dict( - host="127.0.0.1", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) - - -# Namespaces (from your response/WSDL) -NS = { - "soap": "http://schemas.xmlsoap.org/soap/envelope/", - "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", -} - - -# ------------------- HELPERS ------------------- -def normalize_rc(rc: str) -> str: - return rc.replace("/", "").strip() - - -# Ukázka XML – Požadavek: -# -# 801212855 -# -# 002 -# -# -def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: - odb_xml = "".join([f"{kod}" for kod in odb_list]) - inner = f""" - - {rc} - {k_datu} - - {odb_xml} - -""".strip() - - return f""" - - - {inner} - -""" - -def parse_registrace(xml_text: str): - """ - Return (rows, stav_vyrizeni) where rows is a list of dicts - for each item in the response. - """ - root = ET.fromstring(xml_text) - items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) - out = [] - for it in items: - def g(tag, ctx=it): - el = ctx.find(f"rp:{tag}", NS) - return el.text.strip() if el is not None and el.text else None - - poj = it.find("rp:zdravotniPojistovna", NS) - poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None - poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None - - spec = it.find("rp:odbornost", NS) - odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None - odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None - - out.append(dict( - icz=g("ICZ"), - icp=g("ICP"), - nazev_icp=g("nazevICP"), - nazev_szz=g("nazevSZZ"), - poj_kod=poj_kod, - poj_zkratka=poj_zkr, - odbornost_kod=odb_kod, - odbornost_nazev=odb_naz, - datum_registrace=g("datumRegistrace"), - datum_zahajeni=g("datumZahajeni"), - datum_ukonceni=g("datumUkonceni"), - )) - - st = root.find(".//rp:stavVyrizeniPozadavku", NS) - stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None - return out, stav_vyrizeni - -def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int: - """ - Insert or update rows into medevio.vzp_registrace and print each payload. - Assumes the table already exists (as per your DDL). - """ - if not rows: - print("No rows found; nothing to save.") - return 0 - - sql = """ -INSERT INTO vzp_registrace - (rc, query_date, odbornost_kod, odbornost_nazev, - icz, icp, nazev_icp, nazev_szz, - poj_kod, poj_zkratka, - datum_registrace, datum_zahajeni, datum_ukonceni, - stav_vyrizeni, response_xml) -VALUES - (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, - %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, - %(poj_kod)s, %(poj_zkratka)s, - %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, - %(stav_vyrizeni)s, %(response_xml)s) -ON DUPLICATE KEY UPDATE - odbornost_nazev = VALUES(odbornost_nazev), - nazev_icp = VALUES(nazev_icp), - nazev_szz = VALUES(nazev_szz), - poj_kod = VALUES(poj_kod), - poj_zkratka = VALUES(poj_zkratka), - datum_registrace= VALUES(datum_registrace), - datum_zahajeni = VALUES(datum_zahajeni), - datum_ukonceni = VALUES(datum_ukonceni), - stav_vyrizeni = VALUES(stav_vyrizeni), - response_xml = VALUES(response_xml), - updated_at = CURRENT_TIMESTAMP -""" - rc_norm = normalize_rc(rc) - qd = query_date or date.today().isoformat() - payloads = [] - for r in rows: - payload = { - "rc": rc_norm, - "query_date": qd, - **r, - "stav_vyrizeni": stav_vyrizeni, - "response_xml": xml_text, - } - payloads.append(payload) - - # Print what we're going to save - print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") - for p in payloads: - pprint(p) - - conn = pymysql.connect(**MYSQL_CFG) - try: - with conn.cursor() as cur: - cur.executemany(sql, payloads) - conn.commit() - finally: - conn.close() - - print(f"\nUpserted rows: {len(payloads)}") - return len(payloads) - -def prepare_processed_rcs(): - consql=get_mysql_connection() - cursql=consql.cursor() - - sql=""" - WITH ranked AS ( - SELECT - vreg.*, - ROW_NUMBER() OVER ( - PARTITION BY rc - ORDER BY query_date DESC - ) AS rn - FROM vzp_registrace AS vreg - ) - SELECT rc - FROM ranked - WHERE rn = 1 - """ - - cursql.execute(sql) - rows=cursql.fetchall() - print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") - rc_set_vzp = {row["rc"] for row in rows} - return (rc_set_vzp) - -# ------------------- MAIN FLOW ------------------- -def main(): - K_DATU = date.today().isoformat() # YYYY-MM-DD - ODBORNOSTI = ["001"] - RC='7309208104' - - # Build SOAP envelope - envelope = build_envelope(RC, K_DATU, ODBORNOSTI) - - # mTLS session - session = requests.Session() - session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) - - headers = { - "Content-Type": "text/xml; charset=utf-8", - "SOAPAction": "process", # Oracle composite usually expects this - } - - # Call service - resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), - headers=headers, timeout=30, verify=VERIFY) - print("HTTP:", resp.status_code) - - # (Optional) Uncomment to see raw XML - print(resp.text) - - # Parse and save - rows, stav = parse_registrace(resp.text) - print(rows,stav) - - time.sleep(random.uniform(1, 5)) - - - - - - - - -if __name__ == "__main__": - main() diff --git a/02.2 Testík.py b/02.2 Testík.py deleted file mode 100644 index 51ed961..0000000 --- a/02.2 Testík.py +++ /dev/null @@ -1,308 +0,0 @@ -#kód bude vkládat i řádky pro pacienty bez registrovaného lékař v oboru 001#!/usr/bin/env python3 -*- coding: utf-8 -*- """ Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), parse the response, upsert rows into medevio.vzp_registrace, and print what is being saved. """ # pip install requests_pkcs12 pymysql from requests_pkcs12 import Pkcs12Adapter import requests import xml.etree.ElementTree as ET from datetime import date import pymysql from pymysql.cursors import DictCursor from pprint import pprint from functions import get_medicus_connection from functions import get_mysql_connection import time, random,socket # ------------------- CONFIG ------------------- ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive PFX_PATH = r"mbcert.pfx" # <-- your .pfx path PFX_PASS = "Vlado7309208104++" # <-- your export password VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" Patient + query RC = "7309208104" # rodné číslo without slash RC = "280616/091" # rodné -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), -parse the response, upsert rows into medevio.vzp_registrace, -and print what is being saved. -""" - -# pip install requests_pkcs12 pymysql - -from requests_pkcs12 import Pkcs12Adapter -import requests -import xml.etree.ElementTree as ET -from datetime import date -import pymysql -from pymysql.cursors import DictCursor -from pprint import pprint -from functions import get_medicus_connection -from functions import get_mysql_connection -import time, random,socket - -# ------------------- CONFIG ------------------- -ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive -PFX_PATH = r"mbcert.pfx" # <-- your .pfx path -PFX_PASS = "Vlado7309208104++" # <-- your export password -VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" - -# Patient + query -RC = "7309208104" # rodné číslo without slash -# RC = "280616/091" # rodné číslo without slash -K_DATU = date.today().isoformat() # YYYY-MM-DD -ODBORNOSTI = ["001"] # VPL (adult GP) - -# MySQL -if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): - MYSQL_CFG = dict( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) -elif socket.gethostname().strip() == "SESTRA": - MYSQL_CFG = dict( - host="127.0.0.1", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) - - -# Namespaces (from your response/WSDL) -NS = { - "soap": "http://schemas.xmlsoap.org/soap/envelope/", - "rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1", -} - - -# ------------------- HELPERS ------------------- -def normalize_rc(rc: str) -> str: - return rc.replace("/", "").strip() - -def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str: - odb_xml = "".join([f"{kod}" for kod in odb_list]) - inner = f""" - - {rc} - {k_datu} - - {odb_xml} - -""".strip() - - return f""" - - - {inner} - -""" - -def parse_registrace(xml_text: str): - """ - Return (rows, stav_vyrizeni) where rows is a list of dicts - for each item in the response. - """ - root = ET.fromstring(xml_text) - items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS) - out = [] - for it in items: - def g(tag, ctx=it): - el = ctx.find(f"rp:{tag}", NS) - return el.text.strip() if el is not None and el.text else None - - poj = it.find("rp:zdravotniPojistovna", NS) - poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None - poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None - - spec = it.find("rp:odbornost", NS) - odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None - odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None - - out.append(dict( - icz=g("ICZ"), - icp=g("ICP"), - nazev_icp=g("nazevICP"), - nazev_szz=g("nazevSZZ"), - poj_kod=poj_kod, - poj_zkratka=poj_zkr, - odbornost_kod=odb_kod, - odbornost_nazev=odb_naz, - datum_registrace=g("datumRegistrace"), - datum_zahajeni=g("datumZahajeni"), - datum_ukonceni=g("datumUkonceni"), - )) - - st = root.find(".//rp:stavVyrizeniPozadavku", NS) - stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None - return out, stav_vyrizeni - -def upsert_rows( - rc: str, - query_date: str, - rows: list[dict], - stav_vyrizeni: str, - xml_text: str, - requested_odb_list: list[str] | None = None, -) -> int: - """ - Insert/update medevio.vzp_registrace. - If no items are returned, insert placeholder row(s) - for each requested specialty (e.g., "001") so the negative result is visible. - """ - sql = """ -INSERT INTO vzp_registrace - (rc, query_date, odbornost_kod, odbornost_nazev, - icz, icp, nazev_icp, nazev_szz, - poj_kod, poj_zkratka, - datum_registrace, datum_zahajeni, datum_ukonceni, - stav_vyrizeni, response_xml) -VALUES - (%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s, - %(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s, - %(poj_kod)s, %(poj_zkratka)s, - %(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s, - %(stav_vyrizeni)s, %(response_xml)s) -ON DUPLICATE KEY UPDATE - odbornost_nazev = VALUES(odbornost_nazev), - nazev_icp = VALUES(nazev_icp), - nazev_szz = VALUES(nazev_szz), - poj_kod = VALUES(poj_kod), - poj_zkratka = VALUES(poj_zkratka), - datum_registrace= VALUES(datum_registrace), - datum_zahajeni = VALUES(datum_zahajeni), - datum_ukonceni = VALUES(datum_ukonceni), - stav_vyrizeni = VALUES(stav_vyrizeni), - response_xml = VALUES(response_xml), - updated_at = CURRENT_TIMESTAMP -""" - rc_norm = normalize_rc(rc) - qd = query_date or date.today().isoformat() - - payloads: list[dict] = [] - - if rows: - # Positive path: save what came from the API - for r in rows: - payloads.append({ - "rc": rc_norm, - "query_date": qd, - **r, - "stav_vyrizeni": stav_vyrizeni, - "response_xml": xml_text, - }) - else: - # Negative path: no registration items -> create placeholders - if not requested_odb_list: - requested_odb_list = ["001"] - for kod in requested_odb_list: - payloads.append({ - "rc": rc_norm, - "query_date": qd, - "odbornost_kod": kod, - "odbornost_nazev": None, - "icz": None, - "icp": None, - "nazev_icp": None, - "nazev_szz": None, - "poj_kod": None, - "poj_zkratka": None, - "datum_registrace": None, - "datum_zahajeni": None, - "datum_ukonceni": None, - # Keep what VZP said (e.g., 'X'), and raw XML for audit - "stav_vyrizeni": stav_vyrizeni, - "response_xml": xml_text, - }) - - # Print what we're going to save - print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===") - for p in payloads: - pprint(p) - - if not payloads: - print("No payloads prepared (this should not happen).") - return 0 - - connsql = get_mysql_connection() - try: - with connsql.cursor() as cur: - cur.executemany(sql, payloads) - connsql.commit() - finally: - connsql.close() - - print(f"\nUpserted rows: {len(payloads)}") - return len(payloads) - - -def prepare_processed_rcs(): - consql=get_mysql_connection() - cursql=consql.cursor() - - sql=""" - WITH ranked AS ( - SELECT - vreg.*, - ROW_NUMBER() OVER ( - PARTITION BY rc - ORDER BY query_date DESC - ) AS rn - FROM vzp_registrace AS vreg - ) - SELECT rc - FROM ranked - WHERE rn = 1 - """ - - cursql.execute(sql) - rows=cursql.fetchall() - print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") - rc_set_vzp = {row["rc"] for row in rows} - return (rc_set_vzp) - -# ------------------- MAIN FLOW ------------------- -def main(): - - con = get_medicus_connection() - cur = con.cursor() - cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '1'") - # cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'") - - # Vytvor seznam rodnych cisel, která už máme - rc_set_vzp = prepare_processed_rcs() - - rows = cur.fetchall() - print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") - - for row in rows: - if row[0] in rc_set_vzp: - continue - else: - print(row[0], row[1], row[2]) - K_DATU = date.today().isoformat() # YYYY-MM-DD - ODBORNOSTI = ["001"] - RC=row[0] - - # Build SOAP envelope - envelope = build_envelope(RC, K_DATU, ODBORNOSTI) - - # mTLS session - session = requests.Session() - session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) - - headers = { - "Content-Type": "text/xml; charset=utf-8", - "SOAPAction": "process", # Oracle composite usually expects this - } - print(envelope) - # Call service - resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), - headers=headers, timeout=30, verify=VERIFY) - print("HTTP:", resp.status_code) - - # (Optional) Uncomment to see raw XML - print(resp.text) - - # Parse and save - rows, stav = parse_registrace(resp.text) - print(rows,stav) - upsert_rows(RC, K_DATU, rows, stav, resp.text, requested_odb_list=ODBORNOSTI) - - time.sleep(random.uniform(1, 5)) - - - - - - - - -if __name__ == "__main__": - main() diff --git a/03 VyberKontrolaStavuPojisteni.py b/03 VyberKontrolaStavuPojisteni.py deleted file mode 100644 index 41b6558..0000000 --- a/03 VyberKontrolaStavuPojisteni.py +++ /dev/null @@ -1,46 +0,0 @@ -from functions import get_medicus_connection -from functions import get_mysql_connection -from functions import check_insurance -import time, random - -def prepare_processed_rcs(): - consql=get_mysql_connection() - cursql=consql.cursor() - - sql=""" - WITH ranked AS ( - SELECT - vr.*, - ROW_NUMBER() OVER ( - PARTITION BY rc - ORDER BY k_datu DESC, queried_at DESC - ) AS rn - FROM vzp_stav_pojisteni AS vr - ) - SELECT rc - FROM ranked - WHERE rn = 1 - """ - - cursql.execute(sql) - rows=cursql.fetchall() - print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}") - rc_set_vzp = {row["rc"] for row in rows} - return (rc_set_vzp) - -con=get_medicus_connection() -cur=con.cursor() -cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '3'") - -rc_set_vzp=prepare_processed_rcs() - -rows=cur.fetchall() -print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}") - -for row in rows: - if row[0] in rc_set_vzp: - continue - else: - print(row[0], row[1],row[2]) - print(check_insurance(row[0])) - time.sleep(random.uniform(1, 5)) diff --git a/04 testik.py b/04 testik.py deleted file mode 100644 index b2d20d6..0000000 --- a/04 testik.py +++ /dev/null @@ -1,7 +0,0 @@ -from functions import check_insurance - -if __name__ == "__main__": - rc = "7309208104" - res = check_insurance(rc) - print("=== RESULT ===") - print(res) \ No newline at end of file diff --git a/05 testik.py b/05 testik.py deleted file mode 100644 index 48cda59..0000000 --- a/05 testik.py +++ /dev/null @@ -1,4 +0,0 @@ -import socket - -computer_name = socket.gethostname() -print(computer_name) \ No newline at end of file diff --git a/10 Tests/MBcert.pfx b/10 Tests/MBcert.pfx deleted file mode 100644 index 6fbd274..0000000 Binary files a/10 Tests/MBcert.pfx and /dev/null differ diff --git a/10 Tests/RozdilnalezenimeziFDBaMySQL.py b/10 Tests/RozdilnalezenimeziFDBaMySQL.py deleted file mode 100644 index 018df59..0000000 --- a/10 Tests/RozdilnalezenimeziFDBaMySQL.py +++ /dev/null @@ -1,67 +0,0 @@ -import pandas as pd -import fdb -import pymysql - -# --------------------------------- -# FIREBIRD CONNECTION -# --------------------------------- -fb = fdb.connect( - host="192.168.1.4", - database=r"z:\Medicus 3\data\MEDICUS.FDB", - user="SYSDBA", - password="masterkey", - charset="WIN1250" -) -cur = fb.cursor() - -sql_fb = """ -SELECT kar.rodcis -FROM registr -JOIN kar ON registr.idpac = kar.idpac -WHERE registr.datum_zruseni IS NULL - AND registr.priznak IN ('A','D','V') -""" - -cur.execute(sql_fb) -rows_fb = cur.fetchall() - -df_fb = pd.DataFrame(rows_fb, columns=["rc"]) -print("FB count:", len(df_fb)) - -# --------------------------------- -# MYSQL CONNECTION -# --------------------------------- -mysql = pymysql.connect( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - charset="utf8mb4" -) - -sql_mysql = """ -SELECT rc -FROM vzp_stav_pojisteni AS v -WHERE v.k_datu = CURDATE() - AND v.id = ( - SELECT MAX(id) - FROM vzp_stav_pojisteni - WHERE rc = v.rc - AND k_datu = CURDATE() - ); -""" - -df_mysql = pd.read_sql(sql_mysql, mysql) -print("MySQL count:", len(df_mysql)) - -# --------------------------------- -# FIND MISSING RC -# --------------------------------- -df_missing = df_fb[~df_fb["rc"].isin(df_mysql["rc"])] - -print("\nMissing patients:") -print(df_missing) - -fb.close() -mysql.close() diff --git a/10 Tests/medicus_db.py b/10 Tests/medicus_db.py deleted file mode 100644 index b8de965..0000000 --- a/10 Tests/medicus_db.py +++ /dev/null @@ -1,42 +0,0 @@ -import fdb - - -class MedicusDB: - - def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"): - self.conn = fdb.connect( - host=host, - database=db_path, - user=user, - password=password, - charset=charset - ) - self.cur = self.conn.cursor() - - def query(self, sql, params=None): - self.cur.execute(sql, params or ()) - return self.cur.fetchall() - - def query_dict(self, sql, params=None): - self.cur.execute(sql, params or ()) - cols = [d[0].strip().lower() for d in self.cur.description] - return [dict(zip(cols, row)) for row in self.cur.fetchall()] - - def get_active_registered_patients(self): - sql = """ - SELECT - kar.rodcis, - kar.prijmeni, - kar.jmeno, - kar.poj - FROM registr - JOIN kar ON registr.idpac = kar.idpac - WHERE registr.datum_zruseni IS NULL - AND registr.priznak IN ('A','D','V') - AND kar.rodcis IS NOT NULL - AND kar.rodcis <> '' - """ - return self.query(sql) # or self.query_dict(sql) - - def close(self): - self.conn.close() diff --git a/10 Tests/rozdíl.py b/10 Tests/rozdíl.py deleted file mode 100644 index 0db0af4..0000000 --- a/10 Tests/rozdíl.py +++ /dev/null @@ -1,79 +0,0 @@ -import pandas as pd -import pymysql -from medicus_db import MedicusDB -import fdb - - -# FULL OUTPUT SETTINGS -pd.set_option("display.max_rows", None) -pd.set_option("display.max_columns", None) -pd.set_option("display.width", 0) -pd.set_option("display.max_colwidth", None) - - -# =========== -# -# =========================== -# FIREBIRD → načtení registrovaných pacientů -# ====================================== -db = MedicusDB("192.168.1.4", r"z:\Medicus 3\data\MEDICUS.FDB") -rows_fb = db.get_active_registered_patients() # vrací rc, prijmeni, jmeno, poj -db.close() - -df_fb = pd.DataFrame(rows_fb, columns=["rc", "prijmeni", "jmeno", "poj_medicus"]) -df_fb["poj_medicus"] = df_fb["poj_medicus"].astype(str).str.strip() - -print("FB count:", len(df_fb)) - - -# ====================================== -# MYSQL → načtení dnešních výsledků -# ====================================== -mysql = pymysql.connect( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - charset="utf8mb4" -) - -sql_mysql = """ -SELECT rc, - kod_pojistovny AS poj_mysql, - nazev_pojistovny, - stav, - stav_vyrizeni -FROM vzp_stav_pojisteni AS v -WHERE v.k_datu = CURDATE() - AND v.id = ( - SELECT MAX(id) - FROM vzp_stav_pojisteni - WHERE rc = v.rc - AND k_datu = CURDATE() - ); -""" - -df_mysql = pd.read_sql(sql_mysql, mysql) -df_mysql["poj_mysql"] = df_mysql["poj_mysql"].astype(str).str.strip() - -print("MySQL count:", len(df_mysql)) - - -# ====================================== -# LEFT JOIN: Medicus ↔ MySQL podle RC -# ====================================== -df_merge = df_fb.merge(df_mysql, on="rc", how="left") - - -# ====================================== -# Najít rozdíly pojišťovny -# ====================================== -df_diff = df_merge[df_merge["poj_medicus"] != df_merge["poj_mysql"]] - -print("\nPacienti s rozdílnou pojišťovnou:") -print(df_diff[["rc", "prijmeni", "jmeno", "poj_medicus", "poj_mysql", "nazev_pojistovny"]]) - -# Pokud chceš uložit do Excelu: -# df_diff.to_excel("rozdil_pojistoven.xlsx", index=False) - diff --git a/10 Tests/test kontrola registrovaných.py b/10 Tests/test kontrola registrovaných.py deleted file mode 100644 index ff7382e..0000000 --- a/10 Tests/test kontrola registrovaných.py +++ /dev/null @@ -1,137 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import time -import logging -from medicus_db import MedicusDB -from vzpb2b_client import VZPB2BClient -import pymysql -from datetime import date - -# ========================================== -# LOGGING SETUP -# ========================================== -logging.basicConfig( - filename="insurance_check.log", - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - encoding="utf-8" -) - -console = logging.getLogger("console") -console.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter("%(message)s")) -console.addHandler(handler) - -def log_info(msg): - logging.info(msg) - console.info(msg) - -def log_error(msg): - logging.error(msg) - console.error(msg) - -# ========================================== -# MYSQL CONNECTION -# ========================================== -mysql = pymysql.connect( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - charset="utf8mb4", - autocommit=True -) - -def save_insurance_status(mysql_conn, rc, k_datu, result, xml_text): - sql = """ - INSERT INTO vzp_stav_pojisteni - (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, - pojisteni_kod, stav_vyrizeni, response_xml) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - """ - try: - with mysql_conn.cursor() as cur: - cur.execute(sql, ( - rc, - k_datu, - result["stav"], - result["kodPojistovny"], - result["nazevPojistovny"], - result["pojisteniKod"], - result["stavVyrizeni"], - xml_text - )) - except Exception as e: - log_error(f"❌ MYSQL ERROR for RC {rc}: {e}") - log_error(f"---- RAW XML ----\n{xml_text}\n-----------------") - raise - - -# ========================================== -# CONFIGURATION -# ========================================== -HOST = "192.168.1.4" -DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB" - -PFX_PATH = r"MBcert.pfx" -PFX_PASSWORD = "Vlado7309208104++" - -ENV = "prod" -ICZ = "00000000" -DIC = "00000000" - -# ========================================== -# INIT CONNECTIONS -# ========================================== -db = MedicusDB(HOST, DB_PATH) -vzp = VZPB2BClient(ENV, PFX_PATH, PFX_PASSWORD, icz=ICZ, dic=DIC) - -# ========================================== -# FETCH REGISTERED PATIENTS -# ========================================== -patients = db.get_active_registered_patients() -log_info(f"Checking {len(patients)} registered patients...\n") - -k_datu = date.today().isoformat() - -# ========================================== -# LOOP ONE PATIENT PER SECOND -# ========================================== -for rodcis, prijmeni, jmeno, poj in patients: - - log_info(f"=== Checking {prijmeni} {jmeno} ({rodcis}) ===") - - xml = vzp.stav_pojisteni(rc=rodcis, k_datu=k_datu) - - # 1) Check if response looks like XML - if not xml.strip().startswith("<"): - log_error(f"❌ INVALID XML for RC {rodcis}") - log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") - time.sleep(2) - continue - - # 2) Try parsing XML - try: - result = vzp.parse_stav_pojisteni(xml) - except Exception as e: - log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}") - log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------") - time.sleep(2) - continue - - log_info(f"Result: {result}") - - # 3) Save into MySQL (with logging) - try: - save_insurance_status(mysql, rodcis, k_datu, result, xml) - except Exception: - log_error(f"❌ FAILURE inserting to MySQL for {rodcis}") - continue - - time.sleep(2) - -db.close() -log_info("\nDONE.") diff --git a/10 Tests/test stavpojisteni.py b/10 Tests/test stavpojisteni.py deleted file mode 100644 index 9d3a323..0000000 --- a/10 Tests/test stavpojisteni.py +++ /dev/null @@ -1,19 +0,0 @@ -cur.execute("select rodcis,prijmeni,jmeno from kar where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc") - - - - - -from vzpb2b_client import VZPB2BClient - -client = VZPB2BClient( - env="production", - pfx_path="mbcert.pfx", - pfx_password="Vlado7309208104++", - icz="00000000", - dic="00000000" -) - -response = client.stav_pojisteni("0308020152") -# print(response) -print(client.parse_stav_pojisteni(response)) \ No newline at end of file diff --git a/10 Tests/test.py b/10 Tests/test.py deleted file mode 100644 index 70f2d92..0000000 --- a/10 Tests/test.py +++ /dev/null @@ -1,12 +0,0 @@ -from vzpb2b_client import VZPB2BClient - -client = VZPB2BClient( - env="simu", # or "prod" - pfx_path="mbcert.pfx", - pfx_password="Vlado7309208104++", - icz="00000000", - dic="00000000" -) - -result_xml = client.over_prukaz_pojistence("80203111194350000001") -print(result_xml) diff --git a/10 Tests/vzpb2b_client.py b/10 Tests/vzpb2b_client.py deleted file mode 100644 index a8ea67f..0000000 --- a/10 Tests/vzpb2b_client.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -from requests_pkcs12 import Pkcs12Adapter -import requests -import uuid -from datetime import date - - - -class VZPB2BClient: - def __init__(self, env: str, pfx_path: str, pfx_password: str, - icz: str = "00000000", dic: str = "00000000"): - - # Normalize environment name - env = env.lower().strip() - - if env in ("prod", "production", "live", "real"): - self.env = "prod" - elif env in ("simu", "simulace", "test", "testing"): - self.env = "simu" - else: - raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.") - - self.pfx_path = pfx_path - self.pfx_password = pfx_password - self.icz = icz - self.dic = dic - - # Prepare mTLS session - session = requests.Session() - session.mount( - "https://", - Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password) - ) - self.session = session - - # -------------------------------------------------------------- - # URL BUILDER - # -------------------------------------------------------------- - def _build_endpoint(self, service_name: str) -> str: - """ - SIMU: - https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU?sluzba=SIMU - PROD: - https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/ - """ - - if self.env == "simu": - simu_service = f"SIMU{service_name}" - return ( - f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/" - f"{simu_service}?sluzba={simu_service}" - ) - - # Production - return ( - f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}" - ) - - # -------------------------------------------------------------- - # SOAP HEADER BUILDER - # -------------------------------------------------------------- - def _header(self) -> str: - idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars - return f""" - {idZpravy} - - {self.icz} - {self.dic} - - """ - - # -------------------------------------------------------------- - # OVERPRUKAZ — EHIC CHECK - # -------------------------------------------------------------- - def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str: - """ - Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env). - Returns raw XML string. - """ - - service = "OverPrukazPojistenceB2B" - endpoint = self._build_endpoint(service) - - if not k_datu: - k_datu = date.today().isoformat() - - soap = f""" - - - - {self._header()} - - - - - {cislo_prukazu} - {k_datu} - - - - -""" - - headers = {"Content-Type": "text/xml; charset=utf-8"} - - print(f"Calling: {endpoint}") - response = self.session.post( - endpoint, - data=soap.encode("utf-8"), - headers=headers, - timeout=30 - ) - print("HTTP:", response.status_code) - return response.text - - def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None): - """ - Calls stavPojisteniB2B (SIMU or PROD). - """ - service = "stavPojisteniB2B" - endpoint = self._build_endpoint(service) - - if not k_datu: - k_datu = date.today().isoformat() - - prijmeni_xml = f"{prijmeni}" if prijmeni else "" - - soap = f""" - - - - {self._header()} - - - - - {rc} - {prijmeni_xml} - {k_datu} - - - - -""" - - headers = { - "Content-Type": "text/xml; charset=utf-8", - "SOAPAction": "process" - } - - print(f"Calling: {endpoint}") - resp = self.session.post(endpoint, data=soap.encode("utf-8"), - headers=headers, timeout=30) - print("HTTP:", resp.status_code) - return resp.text - - def parse_stav_pojisteni(self, xml_text: str): - """ - Parses stavPojisteniB2B SOAP response into a Python dict. - - Returned structure: - { - "stavVyrizeni": int, - "stav": str | None, - "kodPojistovny": str | None, - "nazevPojistovny": str | None, - "pojisteniKod": str | None - } - """ - - import xml.etree.ElementTree as ET - - NS = { - "soap": "http://schemas.xmlsoap.org/soap/envelope/", - "vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B" - } - - root = ET.fromstring(xml_text) - - # ---- Extract status ---- - stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS) - stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None - - # ---- If no stavPojisteni element present (e.g. 0 or some errors) ---- - node_stav = root.find(".//vzp:stavPojisteni", NS) - if node_stav is None: - return { - "stavVyrizeni": stav_vyr, - "stav": None, - "kodPojistovny": None, - "nazevPojistovny": None, - "pojisteniKod": None, - } - - def get(tag): - el = node_stav.find(f"vzp:{tag}", NS) - return el.text.strip() if el is not None and el.text else None - - return { - "stavVyrizeni": stav_vyr, - "stav": get("stav"), - "kodPojistovny": get("kodPojistovny"), - "nazevPojistovny": get("nazevPojistovny"), - "pojisteniKod": get("pojisteniKod"), - } diff --git a/File3.py b/File3.py deleted file mode 100644 index 5b772f2..0000000 --- a/File3.py +++ /dev/null @@ -1,24 +0,0 @@ -from functions import check_insurance -import time -import fdb - - -# MEDICUS local CFG (table already created as per previous DDL) -MEDICUS_CFG = dict( - dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb", - user="SYSDBA", - password="masterkey", - charset="win1250" -) -conn=fdb.connect(**MEDICUS_CFG) - -cur=conn.cursor() -cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '8'") - -rows=cur.fetchall() -print(len(rows)) - -for row in rows: - print(row[0], row[1],row[2]) - print(check_insurance(row[0])) - time.sleep(.25) \ No newline at end of file diff --git a/MBcert.pfx b/MBcert.pfx deleted file mode 100644 index 6fbd274..0000000 Binary files a/MBcert.pfx and /dev/null differ diff --git a/functions.py b/functions.py deleted file mode 100644 index 6d72756..0000000 --- a/functions.py +++ /dev/null @@ -1,194 +0,0 @@ -# functions.py -# pip install requests_pkcs12 pymysql -from __future__ import annotations - -import os -from datetime import date -from pprint import pprint -import xml.etree.ElementTree as ET - -import requests -from requests_pkcs12 import Pkcs12Adapter -import pymysql -from pymysql.cursors import DictCursor -import fdb - - -import socket -def get_medicus_connection(): - """ - Attempt to create a Firebird connection to the Medicus database. - Returns: - fdb.Connection object on success - None on failure - """ - if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): - MEDICUS_CFG = dict( - dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb", - user="SYSDBA", - password="masterkey", - charset="win1250", - ) - elif socket.gethostname().strip()=="SESTRA": - MEDICUS_CFG = dict( - dsn=r"192.168.1.10:m:\medicus\data\medicus.fdb", - user="SYSDBA", - password="masterkey", - charset="win1250", - ) - - try: - return fdb.connect(**MEDICUS_CFG) - except fdb.fbcore.DatabaseError as e: - print(f"Medicus DB connection failed: {e}") - return None - - - - -def get_mysql_connection(): - """ - Return a PyMySQL connection or None if the connection fails. - """ - if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"): - MYSQL_CFG = dict( - host="192.168.1.76", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) - elif socket.gethostname().strip() == "SESTRA": - MYSQL_CFG = dict( - host="127.0.0.1", - port=3307, - user="root", - password="Vlado9674+", - database="medevio", - cursorclass=DictCursor, - autocommit=True, # or False if you prefer manual commit - ) - try: - return pymysql.connect(**MYSQL_CFG) - except pymysql.MySQLError as e: - print(f"MySQL connection failed: {e}") - return None - -# ======== CONFIG (env overrides allowed) ======== -PFX_PATH = os.getenv("VZP_PFX_PATH", r"mbcert.pfx") -PFX_PASS = os.getenv("VZP_PFX_PASS", "Vlado7309208104++") -VERIFY = os.getenv("VZP_CA_VERIFY", "true").lower() != "false" # set to path or "false" to disable - -EP_STAV = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/stavPojisteniB2B" -SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/" -NS_STAV = "http://xmlns.gemsystem.cz/stavPojisteniB2B" - - - -# ======== HELPERS ======== -def normalize_rc(rc: str) -> str: - """Return rodné číslo without slash/spaces.""" - return rc.replace("/", "").replace(" ", "").strip() - -def to_int_or_none(val): - if val is None: - return None - s = str(val).strip() - return int(s) if s.isdigit() else None - -def _session(): - s = requests.Session() - s.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS)) - return s - -def post_soap(endpoint: str, inner_xml: str) -> str: - """Wrap body in SOAP 1.1 envelope and POST with mTLS.""" - envelope = f''' - - {inner_xml} -''' - r = _session().post( - endpoint, - data=envelope.encode("utf-8"), - headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"}, - timeout=30, - verify=VERIFY, - ) - # You may log r.status_code if needed. - return r.text - -def parse_stav(xml_text: str) -> dict: - """Parse StavPojisteniB2B response → dict with keys: - 'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'.""" - NS = {"soap": SOAP_NS, "s": NS_STAV} - root = ET.fromstring(xml_text) - out = {"stav": None, "kodPojistovny": None, "nazevPojistovny": None, - "pojisteniKod": None, "stavVyrizeniPozadavku": None} - node = root.find(".//s:stavPojisteni", NS) - if node is not None: - for tag in ("stav", "kodPojistovny", "nazevPojistovny", "pojisteniKod"): - el = node.find(f"s:{tag}", NS) - out[tag] = el.text.strip() if (el is not None and el.text) else None - st = root.find(".//s:stavVyrizeniPozadavku", NS) - out["stavVyrizeniPozadavku"] = st.text.strip() if (st is not None and st.text) else None - return out - -def save_stav_pojisteni(rc: str, k_datu: str, parsed: dict, response_xml: str) -> None: - """Insert one log row into medevio.vzp_stav_pojisteni and print what is being saved.""" - payload = { - "rc": normalize_rc(rc), - "k_datu": k_datu, - "stav": to_int_or_none(parsed.get("stav")), # 'X' -> None - "kod_pojistovny": parsed.get("kodPojistovny"), - "nazev_pojistovny": parsed.get("nazevPojistovny"), - "pojisteni_kod": parsed.get("pojisteniKod"), - "stav_vyrizeni": to_int_or_none(parsed.get("stavVyrizeniPozadavku")), - "response_xml": response_xml, - } - - print("\n=== vzp_stav_pojisteni: will insert ===") - pprint(payload) - - sql = """ - INSERT INTO vzp_stav_pojisteni - (rc, k_datu, stav, kod_pojistovny, nazev_pojistovny, pojisteni_kod, stav_vyrizeni, response_xml) - VALUES - (%(rc)s, %(k_datu)s, %(stav)s, %(kod_pojistovny)s, %(nazev_pojistovny)s, %(pojisteni_kod)s, %(stav_vyrizeni)s, %(response_xml)s) - """ - conn = get_mysql_connection() - try: - with conn.cursor() as cur: - cur.execute(sql, payload) - conn.commit() - finally: - conn.close() - print("Inserted 1 row into vzp_stav_pojisteni.") - -# ======== PUBLIC API ======== -def check_insurance(rc: str, mode: str | None = None, k_datu: str | None = None): - """ - Call VZP 'StavPojisteniB2B' for given RC and date. - Returns (is_active, info_dict). - - is_active: True iff info_dict['stav'] == '1' - - info_dict: {'stav','kodPojistovny','nazevPojistovny','pojisteniKod','stavVyrizeniPozadavku'} - If mode == 'nodb' (case-insensitive), DOES NOT write to DB. Otherwise logs into medevio.vzp_stav_pojisteni. - """ - rc_norm = normalize_rc(rc) - kd = k_datu or date.today().isoformat() - - body = f""" - - {rc_norm} - {kd} -""".strip() - - xml = post_soap(EP_STAV, body) - info = parse_stav(xml) - is_active = (info.get("stav") == "1") - - if not (mode and mode.lower() == "nodb"): - save_stav_pojisteni(rc_norm, kd, info, xml) - - return is_active, info diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 282c772..0000000 Binary files a/requirements.txt and /dev/null differ