Files
ordinaceprojekt/Insurance/StahováníSeznamuPojištěnců/205 ČPZP/StahniSeznamPojistencuCPZP.py
T
Vladimir Buzalka 5d55f80f9d notebookvb
2026-05-12 22:01:43 +02:00

259 lines
8.8 KiB
Python

"""
Stahování seznamu registrovaných pojištěnců ČPZP — samostatný skript.
Co dělá:
- Přihlásí se certifikátem na portál ČPZP (nevyžaduje 01_prihlaseni.py)
- Zjistí poslední stažený měsíc (ze stavového souboru 05_stav.json nebo z existujících souborů)
- Zkusí stáhnout právě 1 následující měsíc
- Při úspěchu (staženo nebo soubor už existoval) uloží nový stav
- Uloží jako: YYYY-MM-DD f205MMRR.123
Stavový soubor: 05_stav.json vedle tohoto skriptu.
{"mesic": 1, "rok": 2026} — poslední úspěšně zpracovaný měsíc
"""
import glob
import json
import os
import re
import sys
import time
from datetime import date
import requests
from bs4 import BeautifulSoup
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
from playwright.sync_api import sync_playwright
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from Knihovny.najdi_dropbox import get_dropbox_root
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
PFX_PASSWORD = b"Vlado7309208104++"
BASE_URL = "https://portal.cpzp.cz"
DEST_DIR = os.path.join(
get_dropbox_root(),
"Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců",
)
STATE_FILE = os.path.join(os.path.dirname(__file__), "05_stav.json")
def _parse_js_str(js_expr: str) -> str:
"""Převede JS string expression ('a' + String.fromCharCode(13,10) + 'b') na Python string."""
parts = re.findall(r"'([^']*)'|String\.fromCharCode\(([\d,\s]+)\)", js_expr)
result = []
for str_part, chars_part in parts:
if str_part is not None and (str_part or not chars_part):
result.append(str_part)
elif chars_part:
result.append("".join(chr(int(c.strip())) for c in chars_part.split(",")))
return "".join(result)
def prihlaseni() -> list[dict]:
"""Přihlásí se certifikátem a vrátí cookies ve formátu pro Playwright."""
login_url = f"{BASE_URL}/app/login/"
post_url = f"{BASE_URL}/app/"
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Origin": BASE_URL,
"Referer": login_url,
})
r = session.get(login_url)
r.raise_for_status()
soup = BeautifulSoup(r.content, "html.parser", from_encoding="utf-8")
form = soup.find("form", {"name": "frmPrihlasCert"})
if not form:
raise RuntimeError("Formulář frmPrihlasCert nenalezen — portál nedostupný nebo změnil strukturu")
csrf_cert = form.find("input", {"name": "csrfCert"})["value"]
html = r.content.decode("utf-8")
match = re.search(r"certificateLoginKey\s*:\s*(.+?)(?=,\s*\n|\n\s*maxHeight)", html, re.DOTALL)
if not match:
raise RuntimeError("certificateLoginKey nenalezen v HTML")
challenge = _parse_js_str(match.group(1))
print(f"Challenge: {challenge[:60]}...")
with open(PFX_PATH, "rb") as f:
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
pem_podpis = (
pkcs7.PKCS7SignatureBuilder()
.set_data(challenge.encode("utf-8"))
.add_signer(cert, private_key, hashes.SHA256())
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
)
r = session.post(post_url, data={
"csrfCert": csrf_cert,
"sign": pem_podpis.decode("ascii").strip(),
}, headers={
"Content-Type": "application/x-www-form-urlencoded",
"Referer": login_url,
})
r.raise_for_status()
if "frmPrihlasCert" in r.text:
raise RuntimeError("Přihlášení selhalo — server vrátil login stránku znovu")
print("Přihlášení úspěšné!")
return [
{
"name": c.name,
"value": c.value,
"domain": c.domain if c.domain.startswith(".") else "." + c.domain,
"path": c.path or "/",
"expires": int(c.expires) if c.expires else -1,
"secure": bool(c.secure),
"httpOnly": False,
"sameSite": "Lax",
}
for c in session.cookies
]
def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]:
mesic += 1
if mesic > 12:
mesic = 1
rok += 1
return mesic, rok
def uz_stazeno(mesic: int, rok: int) -> bool:
"""Vrátí True pokud soubor pro daný měsíc/rok už existuje v DEST_DIR."""
mm = f"{mesic:02d}"
rr = str(rok)[-2:]
pattern = os.path.join(DEST_DIR, f"* f205{mm}{rr}.*")
return bool(glob.glob(pattern))
def zjisti_posledni_ze_souboru() -> tuple[int, int] | None:
"""Prohledá DEST_DIR a zjistí nejnovější stažený měsíc/rok z názvů souborů (f205MMRR)."""
pattern = os.path.join(DEST_DIR, "* f205????.* ")
# Hledáme soubory tvaru '* f205MMRR.*'
soubory = glob.glob(os.path.join(DEST_DIR, "* f205????.* "))
if not soubory:
# zkus bez mezery na konci
soubory = glob.glob(os.path.join(DEST_DIR, "* f205????.*"))
kandidati = []
for s in soubory:
nazev = os.path.basename(s)
m = re.search(r"f205(\d{2})(\d{2})\.", nazev)
if m:
mm = int(m.group(1))
rr = int(m.group(2))
rok = 2000 + rr
if 1 <= mm <= 12:
kandidati.append((rok, mm))
if not kandidati:
return None
posledni_rok, posledni_mesic = max(kandidati)
return posledni_mesic, posledni_rok
def nacti_stav() -> tuple[int, int] | None:
"""Načte poslední zpracovaný měsíc/rok. Vrátí None pokud není žádná informace."""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, encoding="utf-8") as f:
data = json.load(f)
print(f"Stav ze souboru: poslední zpracovaný {data['mesic']:02d}/{data['rok']}")
return data["mesic"], data["rok"]
ze_souboru = zjisti_posledni_ze_souboru()
if ze_souboru:
m, r = ze_souboru
print(f"Stav zjištěn ze souborů: poslední stažený {m:02d}/{r}")
return m, r
return None
def uloz_stav(mesic: int, rok: int) -> None:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False)
def stahni_mesic(page, mesic: int, rok: int) -> bool:
"""Stáhne soubor pro jeden měsíc. Vrátí True pokud staženo."""
today = date.today().strftime("%Y-%m-%d")
if uz_stazeno(mesic, rok):
print(f" [{mesic:02d}/{rok}] přeskočeno — soubor už existuje")
return True # považujeme za úspěch pro účely uložení stavu
# Vyplň formulář
inputs = page.query_selector_all("input[type=text]")
if len(inputs) < 2:
print(f" [{mesic:02d}/{rok}] CHYBA — inputy nenalezeny")
return False
inputs[0].fill(str(mesic))
inputs[1].fill(str(rok))
page.get_by_text("Hledat", exact=True).click()
page.wait_for_load_state("networkidle")
dl_selector = "a:has-text('Seznam registrovaných pojištěnců')"
if not page.query_selector(dl_selector):
print(f" [{mesic:02d}/{rok}] CHYBA — download odkaz nenalezen (soubor zřejmě ještě není k dispozici)")
return False
with page.expect_download() as dl_info:
page.click(dl_selector)
download = dl_info.value
original_name = download.suggested_filename
dest_path = os.path.join(DEST_DIR, f"{today} {original_name}")
download.save_as(dest_path)
print(f" [{mesic:02d}/{rok}] OK — {os.path.basename(dest_path)}")
return True
def hlavni() -> None:
os.makedirs(DEST_DIR, exist_ok=True)
cookies = prihlaseni()
posledni = nacti_stav()
if posledni is None:
raise SystemExit(
"Nelze zjistit poslední stažený měsíc.\n"
f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n"
' {"mesic": 12, "rok": 2025}'
)
mesic, rok = dalsi_mesic(*posledni)
print(f"Zkouším stáhnout: {mesic:02d}/{rok}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
context.add_cookies(cookies)
page = context.new_page()
print("Otevírám stránku klientely...")
page.goto(f"{BASE_URL}/app/prohlizeni-klientely/")
page.wait_for_load_state("networkidle")
if "frmPrihlasCert" in page.content():
raise SystemExit("Portál vyžaduje přihlášení i po předání cookies — zkontroluj certifikát nebo session")
uspech = stahni_mesic(page, mesic, rok)
browser.close()
if uspech:
uloz_stav(mesic, rok)
print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}")
else:
print(f"\nNestaženo — stav nebyl aktualizován, příště se zkusí znovu {mesic:02d}/{rok}")
if __name__ == "__main__":
hlavni()