notebookVB
This commit is contained in:
203
02 testík.py
Normal file
203
02 testík.py
Normal file
@@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
|
||||
parse the response, upsert rows into medevio.vzp_registrace,
|
||||
and print what is being saved.
|
||||
"""
|
||||
|
||||
# pip install requests_pkcs12 pymysql
|
||||
|
||||
from requests_pkcs12 import Pkcs12Adapter
|
||||
import requests
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import date
|
||||
import pymysql
|
||||
from pymysql.cursors import DictCursor
|
||||
from pprint import pprint
|
||||
|
||||
# ------------------- CONFIG -------------------
|
||||
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
|
||||
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
|
||||
PFX_PASS = "Vlado7309208104++" # <-- your export password
|
||||
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
|
||||
|
||||
# Patient + query
|
||||
# RC = "7309208104" # rodné číslo without slash
|
||||
RC = "280616/091" # rodné číslo without slash
|
||||
K_DATU = date.today().isoformat() # YYYY-MM-DD
|
||||
ODBORNOSTI = ["001"] # VPL (adult GP)
|
||||
|
||||
# MySQL
|
||||
MYSQL_CFG = dict(
|
||||
host="192.168.1.76",
|
||||
port=3307,
|
||||
user="root",
|
||||
password="Vlado9674+",
|
||||
database="medevio",
|
||||
cursorclass=DictCursor,
|
||||
autocommit=False,
|
||||
)
|
||||
|
||||
# Namespaces (from your response/WSDL)
|
||||
NS = {
|
||||
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
|
||||
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
|
||||
}
|
||||
|
||||
|
||||
# ------------------- HELPERS -------------------
|
||||
def normalize_rc(rc: str) -> str:
|
||||
return rc.replace("/", "").strip()
|
||||
|
||||
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
|
||||
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
|
||||
inner = f"""
|
||||
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
|
||||
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
|
||||
<ns1:kDatu>{k_datu}</ns1:kDatu>
|
||||
<ns1:seznamOdbornosti>
|
||||
{odb_xml}
|
||||
</ns1:seznamOdbornosti>
|
||||
</ns1:registracePojistencePZSB2B>""".strip()
|
||||
|
||||
return f"""<?xml version="1.0" encoding="utf-8"?>
|
||||
<soap:Envelope xmlns:soap="{NS['soap']}">
|
||||
<soap:Body>
|
||||
{inner}
|
||||
</soap:Body>
|
||||
</soap:Envelope>"""
|
||||
|
||||
def parse_registrace(xml_text: str):
|
||||
"""
|
||||
Return (rows, stav_vyrizeni) where rows is a list of dicts
|
||||
for each <odbornost> item in the response.
|
||||
"""
|
||||
root = ET.fromstring(xml_text)
|
||||
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
|
||||
out = []
|
||||
for it in items:
|
||||
def g(tag, ctx=it):
|
||||
el = ctx.find(f"rp:{tag}", NS)
|
||||
return el.text.strip() if el is not None and el.text else None
|
||||
|
||||
poj = it.find("rp:zdravotniPojistovna", NS)
|
||||
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
|
||||
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
|
||||
|
||||
spec = it.find("rp:odbornost", NS)
|
||||
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
|
||||
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
|
||||
|
||||
out.append(dict(
|
||||
icz=g("ICZ"),
|
||||
icp=g("ICP"),
|
||||
nazev_icp=g("nazevICP"),
|
||||
nazev_szz=g("nazevSZZ"),
|
||||
poj_kod=poj_kod,
|
||||
poj_zkratka=poj_zkr,
|
||||
odbornost_kod=odb_kod,
|
||||
odbornost_nazev=odb_naz,
|
||||
datum_registrace=g("datumRegistrace"),
|
||||
datum_zahajeni=g("datumZahajeni"),
|
||||
datum_ukonceni=g("datumUkonceni"),
|
||||
))
|
||||
|
||||
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
|
||||
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
|
||||
return out, stav_vyrizeni
|
||||
|
||||
def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int:
|
||||
"""
|
||||
Insert or update rows into medevio.vzp_registrace and print each payload.
|
||||
Assumes the table already exists (as per your DDL).
|
||||
"""
|
||||
if not rows:
|
||||
print("No <odbornost> rows found; nothing to save.")
|
||||
return 0
|
||||
|
||||
sql = """
|
||||
INSERT INTO vzp_registrace
|
||||
(rc, query_date, odbornost_kod, odbornost_nazev,
|
||||
icz, icp, nazev_icp, nazev_szz,
|
||||
poj_kod, poj_zkratka,
|
||||
datum_registrace, datum_zahajeni, datum_ukonceni,
|
||||
stav_vyrizeni, response_xml)
|
||||
VALUES
|
||||
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
|
||||
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
|
||||
%(poj_kod)s, %(poj_zkratka)s,
|
||||
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
|
||||
%(stav_vyrizeni)s, %(response_xml)s)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
odbornost_nazev = VALUES(odbornost_nazev),
|
||||
nazev_icp = VALUES(nazev_icp),
|
||||
nazev_szz = VALUES(nazev_szz),
|
||||
poj_kod = VALUES(poj_kod),
|
||||
poj_zkratka = VALUES(poj_zkratka),
|
||||
datum_registrace= VALUES(datum_registrace),
|
||||
datum_zahajeni = VALUES(datum_zahajeni),
|
||||
datum_ukonceni = VALUES(datum_ukonceni),
|
||||
stav_vyrizeni = VALUES(stav_vyrizeni),
|
||||
response_xml = VALUES(response_xml),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
"""
|
||||
rc_norm = normalize_rc(rc)
|
||||
qd = query_date or date.today().isoformat()
|
||||
payloads = []
|
||||
for r in rows:
|
||||
payload = {
|
||||
"rc": rc_norm,
|
||||
"query_date": qd,
|
||||
**r,
|
||||
"stav_vyrizeni": stav_vyrizeni,
|
||||
"response_xml": xml_text,
|
||||
}
|
||||
payloads.append(payload)
|
||||
|
||||
# Print what we're going to save
|
||||
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
|
||||
for p in payloads:
|
||||
pprint(p)
|
||||
|
||||
conn = pymysql.connect(**MYSQL_CFG)
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.executemany(sql, payloads)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
print(f"\nUpserted rows: {len(payloads)}")
|
||||
return len(payloads)
|
||||
|
||||
|
||||
# ------------------- MAIN FLOW -------------------
|
||||
def main():
|
||||
# Build SOAP envelope
|
||||
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
|
||||
|
||||
# mTLS session
|
||||
session = requests.Session()
|
||||
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
|
||||
|
||||
headers = {
|
||||
"Content-Type": "text/xml; charset=utf-8",
|
||||
"SOAPAction": "process", # Oracle composite usually expects this
|
||||
}
|
||||
|
||||
# Call service
|
||||
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
|
||||
headers=headers, timeout=30, verify=VERIFY)
|
||||
print("HTTP:", resp.status_code)
|
||||
|
||||
# (Optional) Uncomment to see raw XML
|
||||
# print(resp.text)
|
||||
|
||||
# Parse and save
|
||||
rows, stav = parse_registrace(resp.text)
|
||||
upsert_rows(RC, K_DATU, rows, stav, resp.text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user