449 lines
17 KiB
Python
449 lines
17 KiB
Python
"""
|
|
Podávání žádostí o výpis registrovaných pojištěnců ZPŠ — čistý Python, bez prohlížeče.
|
|
|
|
Co dělá:
|
|
1. Přihlásí se certifikátem na portál ZPŠ (uloží cookies pro Playwright)
|
|
2. Stáhne nové soubory z výpisové schránky (schranka-vypis-pojistencu-v-kapitaci)
|
|
3. Podá žádost pro 1 následující měsíc
|
|
|
|
Stavový soubor: stav.json vedle tohoto skriptu.
|
|
{"mesic": 2, "rok": 2025} — poslední úspěšně podaný měsíc
|
|
|
|
Log podání: log_podani.json — seznam { datum, ref_cislo, podano_kdy }
|
|
"""
|
|
|
|
import calendar
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
|
|
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")))
|
|
from Knihovny.najdi_dropbox import get_dropbox_root
|
|
|
|
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
|
|
PFX_PASSWORD = b"Vlado7309208104++"
|
|
|
|
BASE_URL = "https://portal.zpskoda.cz"
|
|
SUBMIT_URL = f"{BASE_URL}/json-api/formular-schranky/29-vypis-registrov-pojistencu/ulozit-formular"
|
|
ICZ = "25520"
|
|
|
|
STATE_FILE = os.path.join(os.path.dirname(__file__), "stav.json")
|
|
LOG_FILE = os.path.join(os.path.dirname(__file__), "log_podani.json")
|
|
|
|
# Sdílené soubory s ostatními ZPŠ skripty
|
|
STAHUJ_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "StahováníZpráv", "209 ZPŠ"))
|
|
COOKIES_FILE = os.path.join(STAHUJ_DIR, "zps_cookies.json")
|
|
CHROME_PROFILE = os.path.join(STAHUJ_DIR, "chrome_profile")
|
|
DOWNLOAD_DIR = os.path.join(get_dropbox_root(), "Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců")
|
|
|
|
VYPIS_URL = f"{BASE_URL}/app/schranka-vypis-pojistencu-v-kapitaci"
|
|
DOWNLOAD_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-prilohu"
|
|
PROTOKOL_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-protokol"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Přihlášení
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def prihlaseni() -> requests.Session:
|
|
"""Přihlásí se certifikátem, vrátí autentizovanou session. Uloží cookies pro Playwright."""
|
|
challenge_url = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava"
|
|
certlogin_url = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem"
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Origin": BASE_URL,
|
|
"Referer": BASE_URL + "/",
|
|
})
|
|
|
|
r = session.get(f"{BASE_URL}/app/prihlaseni")
|
|
r.raise_for_status()
|
|
session.cookies.set("pzp_sign", "CERT", domain="portal.zpskoda.cz", path="/")
|
|
|
|
r = session.post(challenge_url, json={"login_sign": "CERT"},
|
|
headers={"Content-Type": "application/json; charset=UTF-8"})
|
|
r.raise_for_status()
|
|
zprava = r.json()["data"]["zprava"]
|
|
print(f"Challenge: {zprava[:60]}...")
|
|
|
|
with open(PFX_PATH, "rb") as f:
|
|
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
|
|
|
|
podpis = (
|
|
pkcs7.PKCS7SignatureBuilder()
|
|
.set_data(zprava.encode("utf-8"))
|
|
.add_signer(cert, private_key, hashes.SHA256())
|
|
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
|
|
.decode("ascii").strip()
|
|
)
|
|
|
|
r = session.post(certlogin_url, json={"zprava": zprava, "podpis": podpis},
|
|
headers={"Content-Type": "application/json; charset=UTF-8"})
|
|
r.raise_for_status()
|
|
data = r.json()["data"]
|
|
|
|
if not data.get("prihlasen"):
|
|
raise RuntimeError(f"Přihlášení selhalo: {r.json().get('errMsg', '')}")
|
|
|
|
print("Přihlášení úspěšné!")
|
|
|
|
cookies = [
|
|
{
|
|
"name": c.name,
|
|
"value": c.value,
|
|
"domain": c.domain if c.domain.startswith(".") else "." + c.domain,
|
|
"path": c.path or "/",
|
|
"expires": int(c.expires) if c.expires else -1,
|
|
"secure": bool(c.secure),
|
|
"httpOnly": False,
|
|
"sameSite": "Lax",
|
|
}
|
|
for c in session.cookies
|
|
]
|
|
with open(COOKIES_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(cookies, f, indent=2, ensure_ascii=False)
|
|
print(f"Cookies uloženy: {len(cookies)} → {COOKIES_FILE}")
|
|
|
|
return session
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stahování z výpisové schránky
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def safe_filename(name: str) -> str:
|
|
return re.sub(r'[\\/:*?"<>|]', "_", name).strip()
|
|
|
|
|
|
def parse_date(date_str: str) -> str:
|
|
try:
|
|
return datetime.strptime(date_str.strip()[:19], "%d.%m.%Y %H:%M:%S").strftime("%Y-%m-%d")
|
|
except Exception:
|
|
try:
|
|
return datetime.strptime(date_str.strip()[:10], "%d.%m.%Y").strftime("%Y-%m-%d")
|
|
except Exception:
|
|
return "0000-00-00"
|
|
|
|
|
|
def parse_row(cells: list) -> dict:
|
|
date_raw = cells[1].strip() if len(cells) > 1 else ""
|
|
desc_raw = cells[2].strip() if len(cells) > 2 else ""
|
|
fname_raw = cells[3].strip() if len(cells) > 3 else ""
|
|
|
|
desc_lines = [l.strip() for l in desc_raw.split("\n") if l.strip()]
|
|
if len(desc_lines) >= 3:
|
|
description = desc_lines[2]
|
|
elif len(desc_lines) >= 2:
|
|
description = desc_lines[1]
|
|
else:
|
|
description = desc_lines[0] if desc_lines else ""
|
|
description = description[:80]
|
|
|
|
fname_match = re.match(r'^(.+?)\s*\(\d{2}\.\d{2}\.\d{4}\)\s*$', fname_raw)
|
|
original = fname_match.group(1).strip() if fname_match else fname_raw.split("(")[0].strip()
|
|
orig_path = Path(original)
|
|
stem = orig_path.stem or "zprava"
|
|
ext = orig_path.suffix or ""
|
|
|
|
date_iso = parse_date(date_raw)
|
|
name = f"{date_iso} {safe_filename(description)} ({safe_filename(stem)}){ext}"
|
|
if len(name) > 240:
|
|
name = f"{date_iso} ({safe_filename(stem)}){ext}"
|
|
|
|
return {"date": date_iso, "desc": description, "original": original, "filename": name}
|
|
|
|
|
|
def stahni_nove_vypisy() -> int:
|
|
"""Stáhne nové soubory z výpisové schránky. Vrátí počet stažených souborů."""
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError:
|
|
print("Chybí playwright: pip install playwright && playwright install chrome")
|
|
return 0
|
|
|
|
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
|
|
|
with open(COOKIES_FILE, encoding="utf-8") as f:
|
|
cookies = json.load(f)
|
|
|
|
downloaded = 0
|
|
|
|
with sync_playwright() as p:
|
|
context = p.chromium.launch_persistent_context(
|
|
user_data_dir=CHROME_PROFILE,
|
|
channel="chrome",
|
|
headless=False,
|
|
slow_mo=100,
|
|
ignore_https_errors=True,
|
|
)
|
|
try:
|
|
context.add_cookies(cookies)
|
|
page = context.new_page()
|
|
|
|
page.goto(f"{VYPIS_URL}/", wait_until="domcontentloaded", timeout=30_000)
|
|
if "prihlaseni" in page.url or "login" in page.url.lower():
|
|
print("Session v prohlížeči expirovala — stahování přeskočeno")
|
|
return 0
|
|
print("Prohlížeč přihlášen OK\n")
|
|
|
|
already = set(os.listdir(DOWNLOAD_DIR))
|
|
print(f"V archivu: {len(already)} souborů.\n")
|
|
|
|
page_num = 1
|
|
seen_ids: set = set()
|
|
|
|
while True:
|
|
url = f"{VYPIS_URL}/stranka-{page_num}"
|
|
print(f" Stránka {page_num}: {url}")
|
|
try:
|
|
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
|
|
except Exception as e:
|
|
print(f" Navigace selhala: {e}")
|
|
break
|
|
page.wait_for_load_state("networkidle", timeout=15_000)
|
|
|
|
data = page.evaluate("""() => {
|
|
const rows = [];
|
|
for (const tr of document.querySelectorAll('table tr')) {
|
|
const cells = Array.from(tr.querySelectorAll('td')).map(td => td.innerText.trim());
|
|
if (cells.length < 4) continue;
|
|
const dlLink = tr.querySelector('a[onclick*="SchrPolOpenFile"]');
|
|
if (!dlLink) continue;
|
|
const mFile = dlLink.getAttribute('onclick').match(/\\d+/);
|
|
const protLink = tr.querySelector('a[onclick*="SchrPolDBProtokol"]');
|
|
const mProt = protLink ? protLink.getAttribute('onclick').match(/\\d+/) : null;
|
|
rows.push({
|
|
cells,
|
|
fileId: mFile ? mFile[0] : null,
|
|
protokolId: mProt ? mProt[0] : null,
|
|
});
|
|
}
|
|
return rows;
|
|
}""")
|
|
rows = [r for r in data if r["fileId"]]
|
|
|
|
if not rows:
|
|
print(f" Stránka {page_num} — žádné řádky, konec schránky.")
|
|
break
|
|
|
|
current_ids = {r["fileId"] for r in rows}
|
|
if current_ids & seen_ids:
|
|
print(f" Stránka {page_num} — opakující se obsah, konec schránky.")
|
|
break
|
|
seen_ids.update(current_ids)
|
|
print(f" Nalezeno {len(rows)} zpráv.")
|
|
|
|
stop = False
|
|
for row in rows:
|
|
info = parse_row(row["cells"])
|
|
|
|
# Zajímají nás pouze .001 soubory
|
|
if Path(info["original"]).suffix.lower() != ".001":
|
|
continue
|
|
|
|
target = os.path.join(DOWNLOAD_DIR, info["filename"])
|
|
|
|
if info["filename"] in already or os.path.exists(target):
|
|
print(f" [stop] Nalezena již stažená zpráva: {info['filename']}")
|
|
stop = True
|
|
break
|
|
|
|
dl_url = f"{DOWNLOAD_URL}?zprava_id={row['fileId']}"
|
|
try:
|
|
r = context.request.get(dl_url, headers={"Referer": VYPIS_URL}, timeout=30_000)
|
|
if not r.ok:
|
|
print(f" HTTP {r.status} příloha (id={row['fileId']})")
|
|
else:
|
|
body = r.body()
|
|
if not body[:9].decode("ascii", errors="ignore").startswith("H09305001"):
|
|
print(f" přeskočeno (není výpis pojištěnců): {info['filename']}")
|
|
else:
|
|
with open(target, "wb") as fh:
|
|
fh.write(body)
|
|
print(f" OK: {info['filename']}")
|
|
already.add(info["filename"])
|
|
downloaded += 1
|
|
except Exception as e:
|
|
print(f" Chyba příloha (id={row['fileId']}): {e}")
|
|
time.sleep(1.0)
|
|
|
|
if stop:
|
|
break
|
|
|
|
page_num += 1
|
|
|
|
finally:
|
|
context.close()
|
|
|
|
return downloaded
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sestavení XML a podpis
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_xml(datum: date) -> str:
|
|
datum_str = datum.strftime("%d.%m.%Y")
|
|
return (
|
|
f'<SchrankaZadost NazevSchranky="VypisPojKap" NazevFiltru="ZZ_VYP_REG">\r\n'
|
|
f'<PolozkaFiltru Nazev="icz">{ICZ}</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="datum">{datum_str}</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="razeni">jmeno</PolozkaFiltru>\r\n'
|
|
f'<PolozkaFiltru Nazev="typ">soubor</PolozkaFiltru>\r\n'
|
|
f'</SchrankaZadost>'
|
|
)
|
|
|
|
|
|
def sign_xml(xml: str) -> str:
|
|
"""Podepíše XML certifikátem, vrátí PKCS7 PEM s \\r\\n (stejný formát jako NMSigner)."""
|
|
with open(PFX_PATH, "rb") as f:
|
|
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
|
|
|
|
pem = (
|
|
pkcs7.PKCS7SignatureBuilder()
|
|
.set_data(xml.encode("utf-8"))
|
|
.add_signer(cert, private_key, hashes.SHA256())
|
|
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature, pkcs7.PKCS7Options.NoCerts])
|
|
.decode("ascii")
|
|
)
|
|
return pem.replace("\r\n", "\n").replace("\n", "\r\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Odeslání žádosti
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def odeslat_zadost(session: requests.Session, datum: date) -> str | None:
|
|
"""Odešle podepsanou žádost na portál. Vrátí referenční číslo nebo None."""
|
|
xml = build_xml(datum)
|
|
podpis = sign_xml(xml)
|
|
|
|
payload = {"schrXml": xml, "schrSign": podpis, "schrFiles": []}
|
|
|
|
r = session.post(SUBMIT_URL, json=payload, headers={
|
|
"Content-Type": "application/json; charset=UTF-8",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Referer": BASE_URL + "/",
|
|
})
|
|
r.raise_for_status()
|
|
|
|
try:
|
|
resp = r.json()
|
|
except Exception:
|
|
print(f" Odpověď není JSON: {r.text[:300]}")
|
|
return None
|
|
|
|
resp_str = json.dumps(resp, ensure_ascii=False)
|
|
m = re.search(r'\b(17\d{7}|18\d{7})\b', resp_str)
|
|
ref = m.group(1) if m else None
|
|
|
|
if resp.get("errMsg") or resp.get("error"):
|
|
print(f" Chyba od serveru: {resp.get('errMsg') or resp.get('error')}")
|
|
return None
|
|
|
|
if ref:
|
|
print(f" OK — ref. číslo: {ref}")
|
|
else:
|
|
print(f" Odpověď (bez ref. čísla): {resp_str[:300]}")
|
|
|
|
return ref or ("OK" if r.ok else None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stav a log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]:
|
|
mesic += 1
|
|
if mesic > 12:
|
|
mesic = 1
|
|
rok += 1
|
|
return mesic, rok
|
|
|
|
|
|
def posledni_den(mesic: int, rok: int) -> date:
|
|
_, last = calendar.monthrange(rok, mesic)
|
|
return date(rok, mesic, last)
|
|
|
|
|
|
def nacti_stav() -> tuple[int, int] | None:
|
|
if os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
print(f"Stav: poslední podaný {data['mesic']:02d}/{data['rok']}")
|
|
return data["mesic"], data["rok"]
|
|
return None
|
|
|
|
|
|
def uloz_stav(mesic: int, rok: int) -> None:
|
|
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False)
|
|
|
|
|
|
def uloz_log(mesic: int, rok: int, ref_cislo: str) -> None:
|
|
log = []
|
|
if os.path.exists(LOG_FILE):
|
|
with open(LOG_FILE, encoding="utf-8") as f:
|
|
log = json.load(f)
|
|
log.append({
|
|
"datum": posledni_den(mesic, rok).strftime("%d.%m.%Y"),
|
|
"ref_cislo": ref_cislo,
|
|
"podano_kdy": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
})
|
|
with open(LOG_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(log, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hlavní funkce
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hlavni() -> None:
|
|
posledni = nacti_stav()
|
|
if posledni is None:
|
|
raise SystemExit(
|
|
"Nelze zjistit poslední podaný měsíc.\n"
|
|
f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n"
|
|
' {"mesic": 2, "rok": 2025}'
|
|
)
|
|
|
|
# 1. Přihlášení — uloží cookies pro Playwright
|
|
prihlaseni()
|
|
|
|
# 2. Stažení nových výpisů z výpisové schránky
|
|
print("\n=== Stahování nových výpisů ===")
|
|
stazeno = stahni_nove_vypisy()
|
|
print(f"Staženo: {stazeno} souborů.\n")
|
|
|
|
# 3. Znovu přihlásit — Playwright mohl invalidovat předchozí session
|
|
print("=== Znovu přihlašuji před podáním ===")
|
|
session = prihlaseni()
|
|
|
|
# 4. Podání žádosti pro následující měsíc
|
|
mesic, rok = dalsi_mesic(*posledni)
|
|
datum = posledni_den(mesic, rok)
|
|
print(f"=== Podávám žádost pro: {datum.strftime('%d.%m.%Y')} ===")
|
|
|
|
ref = odeslat_zadost(session, datum)
|
|
|
|
if ref:
|
|
uloz_stav(mesic, rok)
|
|
uloz_log(mesic, rok, ref)
|
|
print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}, ref: {ref}")
|
|
else:
|
|
print(f"\nPodání selhalo — stav nebyl aktualizován, příště se zkusí znovu {datum.strftime('%d.%m.%Y')}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
hlavni()
|