238 lines
8.3 KiB
Python
238 lines
8.3 KiB
Python
"""
|
|
Podávání žádostí o výpis registrovaných pojištěnců ZPŠ — čistý Python, bez prohlížeče.
|
|
|
|
Co dělá:
|
|
- Přihlásí se certifikátem na portál ZPŠ (requests + cryptography)
|
|
- Sestaví XML žádosti, podepíše certifikátem (PKCS7/SHA-256)
|
|
- Odešle POST na JSON API portálu
|
|
- Zjistí poslední podaný měsíc ze stavového souboru stav.json
|
|
- Podá žádost pro 1 následující měsíc (poslední den daného měsíce)
|
|
- Při úspěchu uloží nový stav a referenční číslo do logu
|
|
|
|
Stavový soubor: stav.json vedle tohoto skriptu.
|
|
{"mesic": 2, "rok": 2025} — poslední úspěšně podaný měsíc
|
|
|
|
Log podání: log_podani.json — seznam { datum, ref_cislo, podano_kdy }
|
|
"""
|
|
|
|
import calendar
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import date, datetime
|
|
|
|
import requests
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
|
|
|
|
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
|
|
PFX_PASSWORD = b"Vlado7309208104++"
|
|
|
|
BASE_URL = "https://portal.zpskoda.cz"
|
|
SUBMIT_URL = f"{BASE_URL}/json-api/formular-schranky/29-vypis-registrov-pojistencu/ulozit-formular"
|
|
ICZ = "25520"
|
|
|
|
STATE_FILE = os.path.join(os.path.dirname(__file__), "stav.json")
|
|
LOG_FILE = os.path.join(os.path.dirname(__file__), "log_podani.json")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Přihlášení
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def prihlaseni() -> requests.Session:
|
|
"""Přihlásí se certifikátem, vrátí autentizovanou session."""
|
|
challenge_url = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava"
|
|
certlogin_url = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem"
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Origin": BASE_URL,
|
|
"Referer": BASE_URL + "/",
|
|
})
|
|
|
|
r = session.get(f"{BASE_URL}/app/prihlaseni")
|
|
r.raise_for_status()
|
|
session.cookies.set("pzp_sign", "CERT", domain="portal.zpskoda.cz", path="/")
|
|
|
|
r = session.post(challenge_url, json={"login_sign": "CERT"},
|
|
headers={"Content-Type": "application/json; charset=UTF-8"})
|
|
r.raise_for_status()
|
|
zprava = r.json()["data"]["zprava"]
|
|
print(f"Challenge: {zprava[:60]}...")
|
|
|
|
with open(PFX_PATH, "rb") as f:
|
|
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
|
|
|
|
podpis = (
|
|
pkcs7.PKCS7SignatureBuilder()
|
|
.set_data(zprava.encode("utf-8"))
|
|
.add_signer(cert, private_key, hashes.SHA256())
|
|
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
|
|
.decode("ascii").strip()
|
|
)
|
|
|
|
r = session.post(certlogin_url, json={"zprava": zprava, "podpis": podpis},
|
|
headers={"Content-Type": "application/json; charset=UTF-8"})
|
|
r.raise_for_status()
|
|
data = r.json()["data"]
|
|
|
|
if not data.get("prihlasen"):
|
|
raise RuntimeError(f"Přihlášení selhalo: {r.json().get('errMsg', '')}")
|
|
|
|
print("Přihlášení úspěšné!")
|
|
return session
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sestavení XML a podpis
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_xml(datum: date) -> str:
|
|
datum_str = datum.strftime("%d.%m.%Y")
|
|
return (
|
|
f'<SchrankaZadost NazevSchranky="VypisPojKap" NazevFiltru="ZZ_VYP_REG">\r\n'
|
|
f'<PolozkaFiltru Nazev="icz">{ICZ}</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="datum">{datum_str}</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="razeni">jmeno</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="typ">soubor</PolozkaFiltru>\r\n'
|
|
f'</SchrankaZadost>'
|
|
)
|
|
|
|
|
|
def sign_xml(xml: str) -> str:
|
|
"""Podepíše XML certifikátem, vrátí PKCS7 PEM s \\r\\n (stejný formát jako NMSigner)."""
|
|
with open(PFX_PATH, "rb") as f:
|
|
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
|
|
|
|
pem = (
|
|
pkcs7.PKCS7SignatureBuilder()
|
|
.set_data(xml.encode("utf-8"))
|
|
.add_signer(cert, private_key, hashes.SHA256())
|
|
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature, pkcs7.PKCS7Options.NoCerts])
|
|
.decode("ascii")
|
|
)
|
|
# NMSigner normalizuje konce řádků na \r\n
|
|
return pem.replace("\r\n", "\n").replace("\n", "\r\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Odeslání žádosti
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def odeslat_zadost(session: requests.Session, datum: date) -> str | None:
|
|
"""Odešle podepsanou žádost na portál. Vrátí referenční číslo nebo None."""
|
|
xml = build_xml(datum)
|
|
podpis = sign_xml(xml)
|
|
|
|
payload = {"schrXml": xml, "schrSign": podpis, "schrFiles": []}
|
|
|
|
r = session.post(SUBMIT_URL, json=payload, headers={
|
|
"Content-Type": "application/json; charset=UTF-8",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Referer": BASE_URL + "/",
|
|
})
|
|
r.raise_for_status()
|
|
|
|
try:
|
|
resp = r.json()
|
|
except Exception:
|
|
print(f" Odpověď není JSON: {r.text[:300]}")
|
|
return None
|
|
|
|
# Hledáme referenční číslo v odpovědi
|
|
resp_str = json.dumps(resp, ensure_ascii=False)
|
|
m = re.search(r'\b(17\d{7}|18\d{7})\b', resp_str)
|
|
ref = m.group(1) if m else None
|
|
|
|
if resp.get("errMsg") or resp.get("error"):
|
|
print(f" Chyba od serveru: {resp.get('errMsg') or resp.get('error')}")
|
|
return None
|
|
|
|
if ref:
|
|
print(f" OK — ref. číslo: {ref}")
|
|
else:
|
|
print(f" Odpověď (bez ref. čísla): {resp_str[:300]}")
|
|
|
|
return ref or ("OK" if r.ok else None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stav a log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]:
|
|
mesic += 1
|
|
if mesic > 12:
|
|
mesic = 1
|
|
rok += 1
|
|
return mesic, rok
|
|
|
|
|
|
def posledni_den(mesic: int, rok: int) -> date:
|
|
_, last = calendar.monthrange(rok, mesic)
|
|
return date(rok, mesic, last)
|
|
|
|
|
|
def nacti_stav() -> tuple[int, int] | None:
|
|
if os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
print(f"Stav: poslední podaný {data['mesic']:02d}/{data['rok']}")
|
|
return data["mesic"], data["rok"]
|
|
return None
|
|
|
|
|
|
def uloz_stav(mesic: int, rok: int) -> None:
|
|
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False)
|
|
|
|
|
|
def uloz_log(mesic: int, rok: int, ref_cislo: str) -> None:
|
|
log = []
|
|
if os.path.exists(LOG_FILE):
|
|
with open(LOG_FILE, encoding="utf-8") as f:
|
|
log = json.load(f)
|
|
log.append({
|
|
"datum": posledni_den(mesic, rok).strftime("%d.%m.%Y"),
|
|
"ref_cislo": ref_cislo,
|
|
"podano_kdy": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
})
|
|
with open(LOG_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(log, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hlavní funkce
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hlavni() -> None:
|
|
posledni = nacti_stav()
|
|
if posledni is None:
|
|
raise SystemExit(
|
|
"Nelze zjistit poslední podaný měsíc.\n"
|
|
f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n"
|
|
' {"mesic": 2, "rok": 2025}'
|
|
)
|
|
|
|
mesic, rok = dalsi_mesic(*posledni)
|
|
datum = posledni_den(mesic, rok)
|
|
print(f"Podávám žádost pro: {datum.strftime('%d.%m.%Y')}")
|
|
|
|
session = prihlaseni()
|
|
ref = odeslat_zadost(session, datum)
|
|
|
|
if ref:
|
|
uloz_stav(mesic, rok)
|
|
uloz_log(mesic, rok, ref)
|
|
print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}, ref: {ref}")
|
|
else:
|
|
print(f"\nPodání selhalo — stav nebyl aktualizován, příště se zkusí znovu {datum.strftime('%d.%m.%Y')}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
hlavni()
|