Files
Vladimir Buzalka 71b8ed676a notebookvb
2026-05-13 07:43:33 +02:00

449 lines
17 KiB
Python

"""
Podávání žádostí o výpis registrovaných pojištěnců ZPŠ — čistý Python, bez prohlížeče.
Co dělá:
1. Přihlásí se certifikátem na portál ZPŠ (uloží cookies pro Playwright)
2. Stáhne nové soubory z výpisové schránky (schranka-vypis-pojistencu-v-kapitaci)
3. Podá žádost pro 1 následující měsíc
Stavový soubor: stav.json vedle tohoto skriptu.
{"mesic": 2, "rok": 2025} — poslední úspěšně podaný měsíc
Log podání: log_podani.json — seznam { datum, ref_cislo, podano_kdy }
"""
import calendar
import json
import os
import re
import sys
import time
from datetime import date, datetime
from pathlib import Path
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")))
from Knihovny.najdi_dropbox import get_dropbox_root
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
PFX_PASSWORD = b"Vlado7309208104++"
BASE_URL = "https://portal.zpskoda.cz"
SUBMIT_URL = f"{BASE_URL}/json-api/formular-schranky/29-vypis-registrov-pojistencu/ulozit-formular"
ICZ = "25520"
STATE_FILE = os.path.join(os.path.dirname(__file__), "stav.json")
LOG_FILE = os.path.join(os.path.dirname(__file__), "log_podani.json")
# Sdílené soubory s ostatními ZPŠ skripty
STAHUJ_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "StahováníZpráv", "209 ZPŠ"))
COOKIES_FILE = os.path.join(STAHUJ_DIR, "zps_cookies.json")
CHROME_PROFILE = os.path.join(STAHUJ_DIR, "chrome_profile")
DOWNLOAD_DIR = os.path.join(get_dropbox_root(), "Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců")
VYPIS_URL = f"{BASE_URL}/app/schranka-vypis-pojistencu-v-kapitaci"
DOWNLOAD_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-prilohu"
PROTOKOL_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-protokol"
# ---------------------------------------------------------------------------
# Přihlášení
# ---------------------------------------------------------------------------
def prihlaseni() -> requests.Session:
"""Přihlásí se certifikátem, vrátí autentizovanou session. Uloží cookies pro Playwright."""
challenge_url = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava"
certlogin_url = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem"
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"X-Requested-With": "XMLHttpRequest",
"Origin": BASE_URL,
"Referer": BASE_URL + "/",
})
r = session.get(f"{BASE_URL}/app/prihlaseni")
r.raise_for_status()
session.cookies.set("pzp_sign", "CERT", domain="portal.zpskoda.cz", path="/")
r = session.post(challenge_url, json={"login_sign": "CERT"},
headers={"Content-Type": "application/json; charset=UTF-8"})
r.raise_for_status()
zprava = r.json()["data"]["zprava"]
print(f"Challenge: {zprava[:60]}...")
with open(PFX_PATH, "rb") as f:
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
podpis = (
pkcs7.PKCS7SignatureBuilder()
.set_data(zprava.encode("utf-8"))
.add_signer(cert, private_key, hashes.SHA256())
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
.decode("ascii").strip()
)
r = session.post(certlogin_url, json={"zprava": zprava, "podpis": podpis},
headers={"Content-Type": "application/json; charset=UTF-8"})
r.raise_for_status()
data = r.json()["data"]
if not data.get("prihlasen"):
raise RuntimeError(f"Přihlášení selhalo: {r.json().get('errMsg', '')}")
print("Přihlášení úspěšné!")
cookies = [
{
"name": c.name,
"value": c.value,
"domain": c.domain if c.domain.startswith(".") else "." + c.domain,
"path": c.path or "/",
"expires": int(c.expires) if c.expires else -1,
"secure": bool(c.secure),
"httpOnly": False,
"sameSite": "Lax",
}
for c in session.cookies
]
with open(COOKIES_FILE, "w", encoding="utf-8") as f:
json.dump(cookies, f, indent=2, ensure_ascii=False)
print(f"Cookies uloženy: {len(cookies)}{COOKIES_FILE}")
return session
# ---------------------------------------------------------------------------
# Stahování z výpisové schránky
# ---------------------------------------------------------------------------
def safe_filename(name: str) -> str:
return re.sub(r'[\\/:*?"<>|]', "_", name).strip()
def parse_date(date_str: str) -> str:
try:
return datetime.strptime(date_str.strip()[:19], "%d.%m.%Y %H:%M:%S").strftime("%Y-%m-%d")
except Exception:
try:
return datetime.strptime(date_str.strip()[:10], "%d.%m.%Y").strftime("%Y-%m-%d")
except Exception:
return "0000-00-00"
def parse_row(cells: list) -> dict:
date_raw = cells[1].strip() if len(cells) > 1 else ""
desc_raw = cells[2].strip() if len(cells) > 2 else ""
fname_raw = cells[3].strip() if len(cells) > 3 else ""
desc_lines = [l.strip() for l in desc_raw.split("\n") if l.strip()]
if len(desc_lines) >= 3:
description = desc_lines[2]
elif len(desc_lines) >= 2:
description = desc_lines[1]
else:
description = desc_lines[0] if desc_lines else ""
description = description[:80]
fname_match = re.match(r'^(.+?)\s*\(\d{2}\.\d{2}\.\d{4}\)\s*$', fname_raw)
original = fname_match.group(1).strip() if fname_match else fname_raw.split("(")[0].strip()
orig_path = Path(original)
stem = orig_path.stem or "zprava"
ext = orig_path.suffix or ""
date_iso = parse_date(date_raw)
name = f"{date_iso} {safe_filename(description)} ({safe_filename(stem)}){ext}"
if len(name) > 240:
name = f"{date_iso} ({safe_filename(stem)}){ext}"
return {"date": date_iso, "desc": description, "original": original, "filename": name}
def stahni_nove_vypisy() -> int:
"""Stáhne nové soubory z výpisové schránky. Vrátí počet stažených souborů."""
try:
from playwright.sync_api import sync_playwright
except ImportError:
print("Chybí playwright: pip install playwright && playwright install chrome")
return 0
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
with open(COOKIES_FILE, encoding="utf-8") as f:
cookies = json.load(f)
downloaded = 0
with sync_playwright() as p:
context = p.chromium.launch_persistent_context(
user_data_dir=CHROME_PROFILE,
channel="chrome",
headless=False,
slow_mo=100,
ignore_https_errors=True,
)
try:
context.add_cookies(cookies)
page = context.new_page()
page.goto(f"{VYPIS_URL}/", wait_until="domcontentloaded", timeout=30_000)
if "prihlaseni" in page.url or "login" in page.url.lower():
print("Session v prohlížeči expirovala — stahování přeskočeno")
return 0
print("Prohlížeč přihlášen OK\n")
already = set(os.listdir(DOWNLOAD_DIR))
print(f"V archivu: {len(already)} souborů.\n")
page_num = 1
seen_ids: set = set()
while True:
url = f"{VYPIS_URL}/stranka-{page_num}"
print(f" Stránka {page_num}: {url}")
try:
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
except Exception as e:
print(f" Navigace selhala: {e}")
break
page.wait_for_load_state("networkidle", timeout=15_000)
data = page.evaluate("""() => {
const rows = [];
for (const tr of document.querySelectorAll('table tr')) {
const cells = Array.from(tr.querySelectorAll('td')).map(td => td.innerText.trim());
if (cells.length < 4) continue;
const dlLink = tr.querySelector('a[onclick*="SchrPolOpenFile"]');
if (!dlLink) continue;
const mFile = dlLink.getAttribute('onclick').match(/\\d+/);
const protLink = tr.querySelector('a[onclick*="SchrPolDBProtokol"]');
const mProt = protLink ? protLink.getAttribute('onclick').match(/\\d+/) : null;
rows.push({
cells,
fileId: mFile ? mFile[0] : null,
protokolId: mProt ? mProt[0] : null,
});
}
return rows;
}""")
rows = [r for r in data if r["fileId"]]
if not rows:
print(f" Stránka {page_num} — žádné řádky, konec schránky.")
break
current_ids = {r["fileId"] for r in rows}
if current_ids & seen_ids:
print(f" Stránka {page_num} — opakující se obsah, konec schránky.")
break
seen_ids.update(current_ids)
print(f" Nalezeno {len(rows)} zpráv.")
stop = False
for row in rows:
info = parse_row(row["cells"])
# Zajímají nás pouze .001 soubory
if Path(info["original"]).suffix.lower() != ".001":
continue
target = os.path.join(DOWNLOAD_DIR, info["filename"])
if info["filename"] in already or os.path.exists(target):
print(f" [stop] Nalezena již stažená zpráva: {info['filename']}")
stop = True
break
dl_url = f"{DOWNLOAD_URL}?zprava_id={row['fileId']}"
try:
r = context.request.get(dl_url, headers={"Referer": VYPIS_URL}, timeout=30_000)
if not r.ok:
print(f" HTTP {r.status} příloha (id={row['fileId']})")
else:
body = r.body()
if not body[:9].decode("ascii", errors="ignore").startswith("H09305001"):
print(f" přeskočeno (není výpis pojištěnců): {info['filename']}")
else:
with open(target, "wb") as fh:
fh.write(body)
print(f" OK: {info['filename']}")
already.add(info["filename"])
downloaded += 1
except Exception as e:
print(f" Chyba příloha (id={row['fileId']}): {e}")
time.sleep(1.0)
if stop:
break
page_num += 1
finally:
context.close()
return downloaded
# ---------------------------------------------------------------------------
# Sestavení XML a podpis
# ---------------------------------------------------------------------------
def build_xml(datum: date) -> str:
datum_str = datum.strftime("%d.%m.%Y")
return (
f'<SchrankaZadost NazevSchranky="VypisPojKap" NazevFiltru="ZZ_VYP_REG">\r\n'
f'<PolozkaFiltru Nazev="icz">{ICZ}</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="datum">{datum_str}</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="razeni">jmeno</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="typ">soubor</PolozkaFiltru>\r\n'
f'</SchrankaZadost>'
)
def sign_xml(xml: str) -> str:
"""Podepíše XML certifikátem, vrátí PKCS7 PEM s \\r\\n (stejný formát jako NMSigner)."""
with open(PFX_PATH, "rb") as f:
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
pem = (
pkcs7.PKCS7SignatureBuilder()
.set_data(xml.encode("utf-8"))
.add_signer(cert, private_key, hashes.SHA256())
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature, pkcs7.PKCS7Options.NoCerts])
.decode("ascii")
)
return pem.replace("\r\n", "\n").replace("\n", "\r\n")
# ---------------------------------------------------------------------------
# Odeslání žádosti
# ---------------------------------------------------------------------------
def odeslat_zadost(session: requests.Session, datum: date) -> str | None:
"""Odešle podepsanou žádost na portál. Vrátí referenční číslo nebo None."""
xml = build_xml(datum)
podpis = sign_xml(xml)
payload = {"schrXml": xml, "schrSign": podpis, "schrFiles": []}
r = session.post(SUBMIT_URL, json=payload, headers={
"Content-Type": "application/json; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Referer": BASE_URL + "/",
})
r.raise_for_status()
try:
resp = r.json()
except Exception:
print(f" Odpověď není JSON: {r.text[:300]}")
return None
resp_str = json.dumps(resp, ensure_ascii=False)
m = re.search(r'\b(17\d{7}|18\d{7})\b', resp_str)
ref = m.group(1) if m else None
if resp.get("errMsg") or resp.get("error"):
print(f" Chyba od serveru: {resp.get('errMsg') or resp.get('error')}")
return None
if ref:
print(f" OK — ref. číslo: {ref}")
else:
print(f" Odpověď (bez ref. čísla): {resp_str[:300]}")
return ref or ("OK" if r.ok else None)
# ---------------------------------------------------------------------------
# Stav a log
# ---------------------------------------------------------------------------
def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]:
mesic += 1
if mesic > 12:
mesic = 1
rok += 1
return mesic, rok
def posledni_den(mesic: int, rok: int) -> date:
_, last = calendar.monthrange(rok, mesic)
return date(rok, mesic, last)
def nacti_stav() -> tuple[int, int] | None:
if os.path.exists(STATE_FILE):
with open(STATE_FILE, encoding="utf-8") as f:
data = json.load(f)
print(f"Stav: poslední podaný {data['mesic']:02d}/{data['rok']}")
return data["mesic"], data["rok"]
return None
def uloz_stav(mesic: int, rok: int) -> None:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False)
def uloz_log(mesic: int, rok: int, ref_cislo: str) -> None:
log = []
if os.path.exists(LOG_FILE):
with open(LOG_FILE, encoding="utf-8") as f:
log = json.load(f)
log.append({
"datum": posledni_den(mesic, rok).strftime("%d.%m.%Y"),
"ref_cislo": ref_cislo,
"podano_kdy": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
})
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Hlavní funkce
# ---------------------------------------------------------------------------
def hlavni() -> None:
posledni = nacti_stav()
if posledni is None:
raise SystemExit(
"Nelze zjistit poslední podaný měsíc.\n"
f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n"
' {"mesic": 2, "rok": 2025}'
)
# 1. Přihlášení — uloží cookies pro Playwright
prihlaseni()
# 2. Stažení nových výpisů z výpisové schránky
print("\n=== Stahování nových výpisů ===")
stazeno = stahni_nove_vypisy()
print(f"Staženo: {stazeno} souborů.\n")
# 3. Znovu přihlásit — Playwright mohl invalidovat předchozí session
print("=== Znovu přihlašuji před podáním ===")
session = prihlaseni()
# 4. Podání žádosti pro následující měsíc
mesic, rok = dalsi_mesic(*posledni)
datum = posledni_den(mesic, rok)
print(f"=== Podávám žádost pro: {datum.strftime('%d.%m.%Y')} ===")
ref = odeslat_zadost(session, datum)
if ref:
uloz_stav(mesic, rok)
uloz_log(mesic, rok, ref)
print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}, ref: {ref}")
else:
print(f"\nPodání selhalo — stav nebyl aktualizován, příště se zkusí znovu {datum.strftime('%d.%m.%Y')}")
if __name__ == "__main__":
hlavni()