""" Podávání žádostí o výpis registrovaných pojištěnců ZPŠ — čistý Python, bez prohlížeče. Co dělá: 1. Přihlásí se certifikátem na portál ZPŠ (uloží cookies pro Playwright) 2. Stáhne nové soubory z výpisové schránky (schranka-vypis-pojistencu-v-kapitaci) 3. Podá žádost pro 1 následující měsíc Stavový soubor: stav.json vedle tohoto skriptu. {"mesic": 2, "rok": 2025} — poslední úspěšně podaný měsíc Log podání: log_podani.json — seznam { datum, ref_cislo, podano_kdy } """ import calendar import json import os import re import sys import time from datetime import date, datetime from pathlib import Path import requests from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))) from Knihovny.najdi_dropbox import get_dropbox_root PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx")) PFX_PASSWORD = b"Vlado7309208104++" BASE_URL = "https://portal.zpskoda.cz" SUBMIT_URL = f"{BASE_URL}/json-api/formular-schranky/29-vypis-registrov-pojistencu/ulozit-formular" ICZ = "25520" STATE_FILE = os.path.join(os.path.dirname(__file__), "stav.json") LOG_FILE = os.path.join(os.path.dirname(__file__), "log_podani.json") # Sdílené soubory s ostatními ZPŠ skripty STAHUJ_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "StahováníZpráv", "209 ZPŠ")) COOKIES_FILE = os.path.join(STAHUJ_DIR, "zps_cookies.json") CHROME_PROFILE = os.path.join(STAHUJ_DIR, "chrome_profile") DOWNLOAD_DIR = os.path.join(get_dropbox_root(), "Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců") VYPIS_URL = f"{BASE_URL}/app/schranka-vypis-pojistencu-v-kapitaci" DOWNLOAD_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-prilohu" PROTOKOL_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-protokol" # --------------------------------------------------------------------------- # Přihlášení # --------------------------------------------------------------------------- def prihlaseni() -> requests.Session: """Přihlásí se certifikátem, vrátí autentizovanou session. Uloží cookies pro Playwright.""" challenge_url = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava" certlogin_url = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem" session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "X-Requested-With": "XMLHttpRequest", "Origin": BASE_URL, "Referer": BASE_URL + "/", }) r = session.get(f"{BASE_URL}/app/prihlaseni") r.raise_for_status() session.cookies.set("pzp_sign", "CERT", domain="portal.zpskoda.cz", path="/") r = session.post(challenge_url, json={"login_sign": "CERT"}, headers={"Content-Type": "application/json; charset=UTF-8"}) r.raise_for_status() zprava = r.json()["data"]["zprava"] print(f"Challenge: {zprava[:60]}...") with open(PFX_PATH, "rb") as f: private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD) podpis = ( pkcs7.PKCS7SignatureBuilder() .set_data(zprava.encode("utf-8")) .add_signer(cert, private_key, hashes.SHA256()) .sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature]) .decode("ascii").strip() ) r = session.post(certlogin_url, json={"zprava": zprava, "podpis": podpis}, headers={"Content-Type": "application/json; charset=UTF-8"}) r.raise_for_status() data = r.json()["data"] if not data.get("prihlasen"): raise RuntimeError(f"Přihlášení selhalo: {r.json().get('errMsg', '')}") print("Přihlášení úspěšné!") cookies = [ { "name": c.name, "value": c.value, "domain": c.domain if c.domain.startswith(".") else "." + c.domain, "path": c.path or "/", "expires": int(c.expires) if c.expires else -1, "secure": bool(c.secure), "httpOnly": False, "sameSite": "Lax", } for c in session.cookies ] with open(COOKIES_FILE, "w", encoding="utf-8") as f: json.dump(cookies, f, indent=2, ensure_ascii=False) print(f"Cookies uloženy: {len(cookies)} → {COOKIES_FILE}") return session # --------------------------------------------------------------------------- # Stahování z výpisové schránky # --------------------------------------------------------------------------- def safe_filename(name: str) -> str: return re.sub(r'[\\/:*?"<>|]', "_", name).strip() def parse_date(date_str: str) -> str: try: return datetime.strptime(date_str.strip()[:19], "%d.%m.%Y %H:%M:%S").strftime("%Y-%m-%d") except Exception: try: return datetime.strptime(date_str.strip()[:10], "%d.%m.%Y").strftime("%Y-%m-%d") except Exception: return "0000-00-00" def parse_row(cells: list) -> dict: date_raw = cells[1].strip() if len(cells) > 1 else "" desc_raw = cells[2].strip() if len(cells) > 2 else "" fname_raw = cells[3].strip() if len(cells) > 3 else "" desc_lines = [l.strip() for l in desc_raw.split("\n") if l.strip()] if len(desc_lines) >= 3: description = desc_lines[2] elif len(desc_lines) >= 2: description = desc_lines[1] else: description = desc_lines[0] if desc_lines else "" description = description[:80] fname_match = re.match(r'^(.+?)\s*\(\d{2}\.\d{2}\.\d{4}\)\s*$', fname_raw) original = fname_match.group(1).strip() if fname_match else fname_raw.split("(")[0].strip() orig_path = Path(original) stem = orig_path.stem or "zprava" ext = orig_path.suffix or "" date_iso = parse_date(date_raw) name = f"{date_iso} {safe_filename(description)} ({safe_filename(stem)}){ext}" if len(name) > 240: name = f"{date_iso} ({safe_filename(stem)}){ext}" return {"date": date_iso, "desc": description, "original": original, "filename": name} def stahni_nove_vypisy() -> int: """Stáhne nové soubory z výpisové schránky. Vrátí počet stažených souborů.""" try: from playwright.sync_api import sync_playwright except ImportError: print("Chybí playwright: pip install playwright && playwright install chrome") return 0 os.makedirs(DOWNLOAD_DIR, exist_ok=True) with open(COOKIES_FILE, encoding="utf-8") as f: cookies = json.load(f) downloaded = 0 with sync_playwright() as p: context = p.chromium.launch_persistent_context( user_data_dir=CHROME_PROFILE, channel="chrome", headless=False, slow_mo=100, ignore_https_errors=True, ) try: context.add_cookies(cookies) page = context.new_page() page.goto(f"{VYPIS_URL}/", wait_until="domcontentloaded", timeout=30_000) if "prihlaseni" in page.url or "login" in page.url.lower(): print("Session v prohlížeči expirovala — stahování přeskočeno") return 0 print("Prohlížeč přihlášen OK\n") already = set(os.listdir(DOWNLOAD_DIR)) print(f"V archivu: {len(already)} souborů.\n") page_num = 1 seen_ids: set = set() while True: url = f"{VYPIS_URL}/stranka-{page_num}" print(f" Stránka {page_num}: {url}") try: page.goto(url, wait_until="domcontentloaded", timeout=30_000) except Exception as e: print(f" Navigace selhala: {e}") break page.wait_for_load_state("networkidle", timeout=15_000) data = page.evaluate("""() => { const rows = []; for (const tr of document.querySelectorAll('table tr')) { const cells = Array.from(tr.querySelectorAll('td')).map(td => td.innerText.trim()); if (cells.length < 4) continue; const dlLink = tr.querySelector('a[onclick*="SchrPolOpenFile"]'); if (!dlLink) continue; const mFile = dlLink.getAttribute('onclick').match(/\\d+/); const protLink = tr.querySelector('a[onclick*="SchrPolDBProtokol"]'); const mProt = protLink ? protLink.getAttribute('onclick').match(/\\d+/) : null; rows.push({ cells, fileId: mFile ? mFile[0] : null, protokolId: mProt ? mProt[0] : null, }); } return rows; }""") rows = [r for r in data if r["fileId"]] if not rows: print(f" Stránka {page_num} — žádné řádky, konec schránky.") break current_ids = {r["fileId"] for r in rows} if current_ids & seen_ids: print(f" Stránka {page_num} — opakující se obsah, konec schránky.") break seen_ids.update(current_ids) print(f" Nalezeno {len(rows)} zpráv.") stop = False for row in rows: info = parse_row(row["cells"]) # Zajímají nás pouze .001 soubory if Path(info["original"]).suffix.lower() != ".001": continue target = os.path.join(DOWNLOAD_DIR, info["filename"]) if info["filename"] in already or os.path.exists(target): print(f" [stop] Nalezena již stažená zpráva: {info['filename']}") stop = True break dl_url = f"{DOWNLOAD_URL}?zprava_id={row['fileId']}" try: r = context.request.get(dl_url, headers={"Referer": VYPIS_URL}, timeout=30_000) if not r.ok: print(f" HTTP {r.status} příloha (id={row['fileId']})") else: body = r.body() if not body[:9].decode("ascii", errors="ignore").startswith("H09305001"): print(f" přeskočeno (není výpis pojištěnců): {info['filename']}") else: with open(target, "wb") as fh: fh.write(body) print(f" OK: {info['filename']}") already.add(info["filename"]) downloaded += 1 except Exception as e: print(f" Chyba příloha (id={row['fileId']}): {e}") time.sleep(1.0) if stop: break page_num += 1 finally: context.close() return downloaded # --------------------------------------------------------------------------- # Sestavení XML a podpis # --------------------------------------------------------------------------- def build_xml(datum: date) -> str: datum_str = datum.strftime("%d.%m.%Y") return ( f'\r\n' f'{ICZ}\r\n' f'{datum_str}\r\n' f'jmeno\r\n' f'soubor\r\n' f'' ) def sign_xml(xml: str) -> str: """Podepíše XML certifikátem, vrátí PKCS7 PEM s \\r\\n (stejný formát jako NMSigner).""" with open(PFX_PATH, "rb") as f: private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD) pem = ( pkcs7.PKCS7SignatureBuilder() .set_data(xml.encode("utf-8")) .add_signer(cert, private_key, hashes.SHA256()) .sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature, pkcs7.PKCS7Options.NoCerts]) .decode("ascii") ) return pem.replace("\r\n", "\n").replace("\n", "\r\n") # --------------------------------------------------------------------------- # Odeslání žádosti # --------------------------------------------------------------------------- def odeslat_zadost(session: requests.Session, datum: date) -> str | None: """Odešle podepsanou žádost na portál. Vrátí referenční číslo nebo None.""" xml = build_xml(datum) podpis = sign_xml(xml) payload = {"schrXml": xml, "schrSign": podpis, "schrFiles": []} r = session.post(SUBMIT_URL, json=payload, headers={ "Content-Type": "application/json; charset=UTF-8", "X-Requested-With": "XMLHttpRequest", "Referer": BASE_URL + "/", }) r.raise_for_status() try: resp = r.json() except Exception: print(f" Odpověď není JSON: {r.text[:300]}") return None resp_str = json.dumps(resp, ensure_ascii=False) m = re.search(r'\b(17\d{7}|18\d{7})\b', resp_str) ref = m.group(1) if m else None if resp.get("errMsg") or resp.get("error"): print(f" Chyba od serveru: {resp.get('errMsg') or resp.get('error')}") return None if ref: print(f" OK — ref. číslo: {ref}") else: print(f" Odpověď (bez ref. čísla): {resp_str[:300]}") return ref or ("OK" if r.ok else None) # --------------------------------------------------------------------------- # Stav a log # --------------------------------------------------------------------------- def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]: mesic += 1 if mesic > 12: mesic = 1 rok += 1 return mesic, rok def posledni_den(mesic: int, rok: int) -> date: _, last = calendar.monthrange(rok, mesic) return date(rok, mesic, last) def nacti_stav() -> tuple[int, int] | None: if os.path.exists(STATE_FILE): with open(STATE_FILE, encoding="utf-8") as f: data = json.load(f) print(f"Stav: poslední podaný {data['mesic']:02d}/{data['rok']}") return data["mesic"], data["rok"] return None def uloz_stav(mesic: int, rok: int) -> None: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False) def uloz_log(mesic: int, rok: int, ref_cislo: str) -> None: log = [] if os.path.exists(LOG_FILE): with open(LOG_FILE, encoding="utf-8") as f: log = json.load(f) log.append({ "datum": posledni_den(mesic, rok).strftime("%d.%m.%Y"), "ref_cislo": ref_cislo, "podano_kdy": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }) with open(LOG_FILE, "w", encoding="utf-8") as f: json.dump(log, f, indent=2, ensure_ascii=False) # --------------------------------------------------------------------------- # Hlavní funkce # --------------------------------------------------------------------------- def hlavni() -> None: posledni = nacti_stav() if posledni is None: raise SystemExit( "Nelze zjistit poslední podaný měsíc.\n" f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n" ' {"mesic": 2, "rok": 2025}' ) # 1. Přihlášení — uloží cookies pro Playwright prihlaseni() # 2. Stažení nových výpisů z výpisové schránky print("\n=== Stahování nových výpisů ===") stazeno = stahni_nove_vypisy() print(f"Staženo: {stazeno} souborů.\n") # 3. Znovu přihlásit — Playwright mohl invalidovat předchozí session print("=== Znovu přihlašuji před podáním ===") session = prihlaseni() # 4. Podání žádosti pro následující měsíc mesic, rok = dalsi_mesic(*posledni) datum = posledni_den(mesic, rok) print(f"=== Podávám žádost pro: {datum.strftime('%d.%m.%Y')} ===") ref = odeslat_zadost(session, datum) if ref: uloz_stav(mesic, rok) uloz_log(mesic, rok, ref) print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}, ref: {ref}") else: print(f"\nPodání selhalo — stav nebyl aktualizován, příště se zkusí znovu {datum.strftime('%d.%m.%Y')}") if __name__ == "__main__": hlavni()