259 lines
8.8 KiB
Python
259 lines
8.8 KiB
Python
"""
|
|
Stahování seznamu registrovaných pojištěnců ČPZP — samostatný skript.
|
|
|
|
Co dělá:
|
|
- Přihlásí se certifikátem na portál ČPZP (nevyžaduje 01_prihlaseni.py)
|
|
- Zjistí poslední stažený měsíc (ze stavového souboru 05_stav.json nebo z existujících souborů)
|
|
- Zkusí stáhnout právě 1 následující měsíc
|
|
- Při úspěchu (staženo nebo soubor už existoval) uloží nový stav
|
|
- Uloží jako: YYYY-MM-DD f205MMRR.123
|
|
|
|
Stavový soubor: 05_stav.json vedle tohoto skriptu.
|
|
{"mesic": 1, "rok": 2026} — poslední úspěšně zpracovaný měsíc
|
|
"""
|
|
|
|
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import date
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
from Knihovny.najdi_dropbox import get_dropbox_root
|
|
|
|
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
|
|
PFX_PASSWORD = b"Vlado7309208104++"
|
|
|
|
BASE_URL = "https://portal.cpzp.cz"
|
|
DEST_DIR = os.path.join(
|
|
get_dropbox_root(),
|
|
"Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců",
|
|
)
|
|
STATE_FILE = os.path.join(os.path.dirname(__file__), "05_stav.json")
|
|
|
|
|
|
def _parse_js_str(js_expr: str) -> str:
|
|
"""Převede JS string expression ('a' + String.fromCharCode(13,10) + 'b') na Python string."""
|
|
parts = re.findall(r"'([^']*)'|String\.fromCharCode\(([\d,\s]+)\)", js_expr)
|
|
result = []
|
|
for str_part, chars_part in parts:
|
|
if str_part is not None and (str_part or not chars_part):
|
|
result.append(str_part)
|
|
elif chars_part:
|
|
result.append("".join(chr(int(c.strip())) for c in chars_part.split(",")))
|
|
return "".join(result)
|
|
|
|
|
|
def prihlaseni() -> list[dict]:
|
|
"""Přihlásí se certifikátem a vrátí cookies ve formátu pro Playwright."""
|
|
login_url = f"{BASE_URL}/app/login/"
|
|
post_url = f"{BASE_URL}/app/"
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Origin": BASE_URL,
|
|
"Referer": login_url,
|
|
})
|
|
|
|
r = session.get(login_url)
|
|
r.raise_for_status()
|
|
|
|
soup = BeautifulSoup(r.content, "html.parser", from_encoding="utf-8")
|
|
form = soup.find("form", {"name": "frmPrihlasCert"})
|
|
if not form:
|
|
raise RuntimeError("Formulář frmPrihlasCert nenalezen — portál nedostupný nebo změnil strukturu")
|
|
csrf_cert = form.find("input", {"name": "csrfCert"})["value"]
|
|
|
|
html = r.content.decode("utf-8")
|
|
match = re.search(r"certificateLoginKey\s*:\s*(.+?)(?=,\s*\n|\n\s*maxHeight)", html, re.DOTALL)
|
|
if not match:
|
|
raise RuntimeError("certificateLoginKey nenalezen v HTML")
|
|
challenge = _parse_js_str(match.group(1))
|
|
print(f"Challenge: {challenge[:60]}...")
|
|
|
|
with open(PFX_PATH, "rb") as f:
|
|
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
|
|
|
|
pem_podpis = (
|
|
pkcs7.PKCS7SignatureBuilder()
|
|
.set_data(challenge.encode("utf-8"))
|
|
.add_signer(cert, private_key, hashes.SHA256())
|
|
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
|
|
)
|
|
|
|
r = session.post(post_url, data={
|
|
"csrfCert": csrf_cert,
|
|
"sign": pem_podpis.decode("ascii").strip(),
|
|
}, headers={
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Referer": login_url,
|
|
})
|
|
r.raise_for_status()
|
|
|
|
if "frmPrihlasCert" in r.text:
|
|
raise RuntimeError("Přihlášení selhalo — server vrátil login stránku znovu")
|
|
|
|
print("Přihlášení úspěšné!")
|
|
|
|
return [
|
|
{
|
|
"name": c.name,
|
|
"value": c.value,
|
|
"domain": c.domain if c.domain.startswith(".") else "." + c.domain,
|
|
"path": c.path or "/",
|
|
"expires": int(c.expires) if c.expires else -1,
|
|
"secure": bool(c.secure),
|
|
"httpOnly": False,
|
|
"sameSite": "Lax",
|
|
}
|
|
for c in session.cookies
|
|
]
|
|
|
|
|
|
def dalsi_mesic(mesic: int, rok: int) -> tuple[int, int]:
|
|
mesic += 1
|
|
if mesic > 12:
|
|
mesic = 1
|
|
rok += 1
|
|
return mesic, rok
|
|
|
|
|
|
def uz_stazeno(mesic: int, rok: int) -> bool:
|
|
"""Vrátí True pokud soubor pro daný měsíc/rok už existuje v DEST_DIR."""
|
|
mm = f"{mesic:02d}"
|
|
rr = str(rok)[-2:]
|
|
pattern = os.path.join(DEST_DIR, f"* f205{mm}{rr}.*")
|
|
return bool(glob.glob(pattern))
|
|
|
|
|
|
def zjisti_posledni_ze_souboru() -> tuple[int, int] | None:
|
|
"""Prohledá DEST_DIR a zjistí nejnovější stažený měsíc/rok z názvů souborů (f205MMRR)."""
|
|
pattern = os.path.join(DEST_DIR, "* f205????.* ")
|
|
# Hledáme soubory tvaru '* f205MMRR.*'
|
|
soubory = glob.glob(os.path.join(DEST_DIR, "* f205????.* "))
|
|
if not soubory:
|
|
# zkus bez mezery na konci
|
|
soubory = glob.glob(os.path.join(DEST_DIR, "* f205????.*"))
|
|
kandidati = []
|
|
for s in soubory:
|
|
nazev = os.path.basename(s)
|
|
m = re.search(r"f205(\d{2})(\d{2})\.", nazev)
|
|
if m:
|
|
mm = int(m.group(1))
|
|
rr = int(m.group(2))
|
|
rok = 2000 + rr
|
|
if 1 <= mm <= 12:
|
|
kandidati.append((rok, mm))
|
|
if not kandidati:
|
|
return None
|
|
posledni_rok, posledni_mesic = max(kandidati)
|
|
return posledni_mesic, posledni_rok
|
|
|
|
|
|
def nacti_stav() -> tuple[int, int] | None:
|
|
"""Načte poslední zpracovaný měsíc/rok. Vrátí None pokud není žádná informace."""
|
|
if os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
print(f"Stav ze souboru: poslední zpracovaný {data['mesic']:02d}/{data['rok']}")
|
|
return data["mesic"], data["rok"]
|
|
ze_souboru = zjisti_posledni_ze_souboru()
|
|
if ze_souboru:
|
|
m, r = ze_souboru
|
|
print(f"Stav zjištěn ze souborů: poslední stažený {m:02d}/{r}")
|
|
return m, r
|
|
return None
|
|
|
|
|
|
def uloz_stav(mesic: int, rok: int) -> None:
|
|
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump({"mesic": mesic, "rok": rok}, f, ensure_ascii=False)
|
|
|
|
|
|
def stahni_mesic(page, mesic: int, rok: int) -> bool:
|
|
"""Stáhne soubor pro jeden měsíc. Vrátí True pokud staženo."""
|
|
today = date.today().strftime("%Y-%m-%d")
|
|
|
|
if uz_stazeno(mesic, rok):
|
|
print(f" [{mesic:02d}/{rok}] přeskočeno — soubor už existuje")
|
|
return True # považujeme za úspěch pro účely uložení stavu
|
|
|
|
# Vyplň formulář
|
|
inputs = page.query_selector_all("input[type=text]")
|
|
if len(inputs) < 2:
|
|
print(f" [{mesic:02d}/{rok}] CHYBA — inputy nenalezeny")
|
|
return False
|
|
|
|
inputs[0].fill(str(mesic))
|
|
inputs[1].fill(str(rok))
|
|
|
|
page.get_by_text("Hledat", exact=True).click()
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
dl_selector = "a:has-text('Seznam registrovaných pojištěnců')"
|
|
if not page.query_selector(dl_selector):
|
|
print(f" [{mesic:02d}/{rok}] CHYBA — download odkaz nenalezen (soubor zřejmě ještě není k dispozici)")
|
|
return False
|
|
|
|
with page.expect_download() as dl_info:
|
|
page.click(dl_selector)
|
|
download = dl_info.value
|
|
|
|
original_name = download.suggested_filename
|
|
dest_path = os.path.join(DEST_DIR, f"{today} {original_name}")
|
|
download.save_as(dest_path)
|
|
print(f" [{mesic:02d}/{rok}] OK — {os.path.basename(dest_path)}")
|
|
return True
|
|
|
|
|
|
def hlavni() -> None:
|
|
os.makedirs(DEST_DIR, exist_ok=True)
|
|
|
|
cookies = prihlaseni()
|
|
|
|
posledni = nacti_stav()
|
|
if posledni is None:
|
|
raise SystemExit(
|
|
"Nelze zjistit poslední stažený měsíc.\n"
|
|
f"Vytvoř ručně soubor {STATE_FILE} ve tvaru:\n"
|
|
' {"mesic": 12, "rok": 2025}'
|
|
)
|
|
|
|
mesic, rok = dalsi_mesic(*posledni)
|
|
print(f"Zkouším stáhnout: {mesic:02d}/{rok}")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False)
|
|
context = browser.new_context()
|
|
context.add_cookies(cookies)
|
|
page = context.new_page()
|
|
|
|
print("Otevírám stránku klientely...")
|
|
page.goto(f"{BASE_URL}/app/prohlizeni-klientely/")
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
if "frmPrihlasCert" in page.content():
|
|
raise SystemExit("Portál vyžaduje přihlášení i po předání cookies — zkontroluj certifikát nebo session")
|
|
|
|
uspech = stahni_mesic(page, mesic, rok)
|
|
browser.close()
|
|
|
|
if uspech:
|
|
uloz_stav(mesic, rok)
|
|
print(f"\nHotovo — stav uložen: {mesic:02d}/{rok}")
|
|
else:
|
|
print(f"\nNestaženo — stav nebyl aktualizován, příště se zkusí znovu {mesic:02d}/{rok}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
hlavni()
|