Files
torrents/02.1 Test moje rodne cislo.py
2025-10-03 11:36:33 +02:00

263 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
parse the response, upsert rows into medevio.vzp_registrace,
and print what is being saved.
"""
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
from functions import get_medicus_connection
from functions import get_mysql_connection
import time, random,socket
# ------------------- CONFIG -------------------
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
PFX_PASS = "Vlado7309208104++" # <-- your export password
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
# Patient + query
RC = "7309208104" # rodné číslo without slash
# RC = "280616/091" # rodné číslo without slash
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"] # VPL (adult GP)
# MySQL
if socket.gethostname().strip() == "NTBVBHP470G10":
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
# Namespaces (from your response/WSDL)
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
}
# ------------------- HELPERS -------------------
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
# Ukázka XML Požadavek:
# <registracePojistencePZSB2B xmlns="http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1">
# <cisloPojistence>801212855</cisloPojistence>
# <seznamOdbornosti>
# <kodOdbornosti>002</kodOdbornosti>
# </seznamOdbornosti>
# </registracePojistencePZSB2B>
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
inner = f"""
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
<ns1:kDatu>{k_datu}</ns1:kDatu>
<ns1:seznamOdbornosti>
{odb_xml}
</ns1:seznamOdbornosti>
</ns1:registracePojistencePZSB2B>""".strip()
return f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{NS['soap']}">
<soap:Body>
{inner}
</soap:Body>
</soap:Envelope>"""
def parse_registrace(xml_text: str):
"""
Return (rows, stav_vyrizeni) where rows is a list of dicts
for each <odbornost> item in the response.
"""
root = ET.fromstring(xml_text)
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
out = []
for it in items:
def g(tag, ctx=it):
el = ctx.find(f"rp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
poj = it.find("rp:zdravotniPojistovna", NS)
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
spec = it.find("rp:odbornost", NS)
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
out.append(dict(
icz=g("ICZ"),
icp=g("ICP"),
nazev_icp=g("nazevICP"),
nazev_szz=g("nazevSZZ"),
poj_kod=poj_kod,
poj_zkratka=poj_zkr,
odbornost_kod=odb_kod,
odbornost_nazev=odb_naz,
datum_registrace=g("datumRegistrace"),
datum_zahajeni=g("datumZahajeni"),
datum_ukonceni=g("datumUkonceni"),
))
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
return out, stav_vyrizeni
def upsert_rows(rc: str, query_date: str, rows: list[dict], stav_vyrizeni: str, xml_text: str) -> int:
"""
Insert or update rows into medevio.vzp_registrace and print each payload.
Assumes the table already exists (as per your DDL).
"""
if not rows:
print("No <odbornost> rows found; nothing to save.")
return 0
sql = """
INSERT INTO vzp_registrace
(rc, query_date, odbornost_kod, odbornost_nazev,
icz, icp, nazev_icp, nazev_szz,
poj_kod, poj_zkratka,
datum_registrace, datum_zahajeni, datum_ukonceni,
stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
%(poj_kod)s, %(poj_zkratka)s,
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
%(stav_vyrizeni)s, %(response_xml)s)
ON DUPLICATE KEY UPDATE
odbornost_nazev = VALUES(odbornost_nazev),
nazev_icp = VALUES(nazev_icp),
nazev_szz = VALUES(nazev_szz),
poj_kod = VALUES(poj_kod),
poj_zkratka = VALUES(poj_zkratka),
datum_registrace= VALUES(datum_registrace),
datum_zahajeni = VALUES(datum_zahajeni),
datum_ukonceni = VALUES(datum_ukonceni),
stav_vyrizeni = VALUES(stav_vyrizeni),
response_xml = VALUES(response_xml),
updated_at = CURRENT_TIMESTAMP
"""
rc_norm = normalize_rc(rc)
qd = query_date or date.today().isoformat()
payloads = []
for r in rows:
payload = {
"rc": rc_norm,
"query_date": qd,
**r,
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
}
payloads.append(payload)
# Print what we're going to save
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
for p in payloads:
pprint(p)
conn = pymysql.connect(**MYSQL_CFG)
try:
with conn.cursor() as cur:
cur.executemany(sql, payloads)
conn.commit()
finally:
conn.close()
print(f"\nUpserted rows: {len(payloads)}")
return len(payloads)
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vreg.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY query_date DESC
) AS rn
FROM vzp_registrace AS vreg
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
# ------------------- MAIN FLOW -------------------
def main():
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"]
RC='7309208104'
# Build SOAP envelope
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
# mTLS session
session = requests.Session()
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this
}
# Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML
print(resp.text)
# Parse and save
rows, stav = parse_registrace(resp.text)
print(rows,stav)
time.sleep(random.uniform(1, 5))
if __name__ == "__main__":
main()