Files
ordinaceprojekt/Insurance/SeznamPojistencu/Tests/zadej_davku.py
T
Vladimir Buzalka 371eed9971 notebookvb
2026-05-03 07:02:22 +02:00

168 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys as _sys
_sys.stdout.reconfigure(encoding="utf-8", errors="replace")
_sys.stderr.reconfigure(encoding="utf-8", errors="replace")
"""
zadej_davku.py
==============
Odešle asynchronní požadavek na VZP B2B službu SeznamRegPojistencuB2B
(seznam zakapitovaných/registrovaných pojištěnců za dané období).
Použití:
python zadej_davku.py [mesic] [rok]
python zadej_davku.py 2 2025 # únor 2025
python zadej_davku.py # předchozí měsíc (výchozí)
Výstup:
- korelační ID zprávy (pro pozdější spárování asynchronní odpovědi)
- stavVyrizeniPozadavku=2 znamená "přijato, VZP zpracovává"
- finální odpověď přijde asynchronně na AS2 endpoint partnera
"""
import sys
import uuid
import argparse
from pathlib import Path
from datetime import date, timedelta
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
# ── KONFIGURACE ───────────────────────────────────────────────────────────────
PFX_PATH = Path(__file__).resolve().parent.parent / "Certificates" / "picka.pfx"
PFX_PASSWORD = "Vlado7309208104+"
ICZ = "09305000"
ENV = "prod"
NS_VZP = "http://xmlns.gemsystem.cz/SeznamRegPojistencuB2B"
NS_COMMON = "http://xmlns.gemsystem.cz/CommonB2B"
NS_SOAP = "http://schemas.xmlsoap.org/soap/envelope/"
ENDPOINT_SIMU = "https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMUSeznamRegPojistencuB2B?sluzba=SIMUSeznamRegPojistencuB2B"
ENDPOINT_PROD = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/SeznamRegPojistencuB2B"
# ── ARGUMENTY ────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Požadavek na seznam registrovaných pojištěnců VZP")
parser.add_argument("mesic", nargs="?", type=int, help="Měsíc (1-12)")
parser.add_argument("rok", nargs="?", type=int, help="Rok (např. 2025)")
parser.add_argument("--simu", action="store_true", help="Použít simulační prostředí")
parser.add_argument("--pdf", action="store_true", help="Výstup jako PDF (výchozí: text/plain)")
args = parser.parse_args()
# Výchozí: předchozí měsíc
if args.mesic and args.rok:
mesic, rok = args.mesic, args.rok
else:
prvni_tohoto = date.today().replace(day=1)
predchozi = prvni_tohoto - timedelta(days=1)
mesic, rok = predchozi.month, predchozi.year
format_vystupu = "application/pdf" if args.pdf else "text/plain"
endpoint = ENDPOINT_SIMU if args.simu else ENDPOINT_PROD
# ── KONTROLY ─────────────────────────────────────────────────────────────────
if not PFX_PATH.exists():
print(f"CHYBA: certifikát nenalezen: {PFX_PATH}")
sys.exit(1)
if not (1 <= mesic <= 12):
print(f"CHYBA: neplatný měsíc: {mesic}")
sys.exit(1)
# ── SESTAVENÍ POŽADAVKU ───────────────────────────────────────────────────────
id_zpravy = uuid.uuid4().hex[:12].upper()
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="{NS_SOAP}"
xmlns:vzp="{NS_VZP}"
xmlns:com="{NS_COMMON}">
<soap:Header>
<com:idZpravy>{id_zpravy}</com:idZpravy>
<com:idSubjektu>
<com:icz>{ICZ}</com:icz>
</com:idSubjektu>
</soap:Header>
<soap:Body>
<vzp:seznamRegistrovanychPojistencuB2BPozadavek>
<vzp:idZpravy>{id_zpravy}</vzp:idZpravy>
<vzp:idSubjektu>{ICZ}</vzp:idSubjektu>
<vzp:typSubjektu>zp</vzp:typSubjektu>
<vzp:seznam>
<vzp:obdobiDavky>
<vzp:mesic>{mesic}</vzp:mesic>
<vzp:rok>{rok}</vzp:rok>
</vzp:obdobiDavky>
<vzp:formatVystupu>{format_vystupu}</vzp:formatVystupu>
</vzp:seznam>
</vzp:seznamRegistrovanychPojistencuB2BPozadavek>
</soap:Body>
</soap:Envelope>"""
# ── ODESLÁNÍ ─────────────────────────────────────────────────────────────────
print(f"Období: {mesic:02d}/{rok}")
print(f"Formát: {format_vystupu}")
print(f"Prostředí: {'SIMU' if args.simu else 'PROD'}")
print(f"IČZ: {ICZ}")
print(f"ID zprávy: {id_zpravy}")
print(f"Endpoint: {endpoint}")
print()
session = requests.Session()
session.mount("https://", Pkcs12Adapter(
pkcs12_filename=str(PFX_PATH),
pkcs12_password=PFX_PASSWORD
))
try:
resp = session.post(
endpoint,
data=soap.encode("utf-8"),
headers={"Content-Type": "text/xml; charset=utf-8", "SOAPAction": "process"},
timeout=30
)
except requests.RequestException as e:
print(f"CHYBA při spojení: {e}")
sys.exit(1)
print(f"HTTP status: {resp.status_code}")
print()
# ── PARSOVÁNÍ ODPOVĚDI ────────────────────────────────────────────────────────
if not resp.content:
print("→ Požadavek přijat (prázdná odpověď = asynchronní služba).")
print(f"→ VZP zpracuje a odešle výsledek na AS2 endpoint.")
print(f"→ ID zprávy pro spárování: {id_zpravy}")
else:
try:
root = ET.fromstring(resp.text)
NS = {"soap": NS_SOAP, "vzp": NS_VZP}
korelace = root.find(".//vzp:korelaceZpravy", NS)
text_odp = root.find(".//vzp:textOdpovedi", NS)
stav = root.find(".//vzp:stavVyrizeniPozadavku", NS)
print(f"Korelace zprávy: {korelace.text if korelace is not None else ''}")
print(f"Text odpovědi: {text_odp.text if text_odp is not None else ''}")
print(f"Stav vyřízení: {stav.text if stav is not None else ''}")
if stav is not None and stav.text == "2":
print()
print("→ VZP zpracovává — finální odpověď přijde asynchronně na AS2 endpoint.")
print(f"→ ID zprávy pro spárování: {id_zpravy}")
except ET.ParseError:
print("Nepodařilo se parsovat XML odpověď:")
print(resp.text)