Files
ordinaceprojekt/Insurance/StahováníSeznamuPojištěnců/213 RBP/StahniSeznamPojistencuRBP.py
T
Vladimir Buzalka 19036b58cc notebookvb
2026-06-18 05:32:36 +02:00

416 lines
16 KiB
Python

"""
Stahování seznamu registrovaných pojištěnců RBP (213) — čistý Python, bez NMSigneru.
RBP běží na stejné platformě jako ZPŠ/OZP/VoZP (portalzp.cz / json-api).
- schránka: /app/schranky-vypis-pojistencu-v-kapitaci
- formulář: 110-vypis-pojistencu-reg-u-pzs
- filtr XML: NazevSchranky="VypisPojKap", NazevFiltru="ZZ_VYP_REG" (jako ZPŠ)
- položky: icz (interní ID), datum (Ke dni), razeni (jmeno/rc), typ (soubor/sestava)
- datum = "Ke dni" aktuální snímek platných registrací — použijeme dnešní datum,
nepočítá se měsíc, žádný stav.json (jako OZP).
Co skript dělá v jednom spuštění:
1. Přihlásí se certifikátem (uloží cookies pro Playwright)
2. Stáhne nové výpisy z výpisové schránky (soubory s hlavičkou H09305001)
3. Znovu se přihlásí (Playwright invaliduje requests session)
4. Podá jednu žádost o aktuální výpis ke dnešnímu dni
Log podání: log_podani.json — seznam { ref_cislo, datum, podano_kdy }
"""
import json
import os
import re
import sys
import time
from datetime import date, datetime
from pathlib import Path
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12
# UTF-8 výstup i na Windows konzoli
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")))
from Knihovny.najdi_dropbox import get_dropbox_root
PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "Certificates", "MBQualifiedCert.pfx"))
PFX_PASSWORD = b"Vlado7309208104++"
BASE_URL = "https://portal.rbp-zp.cz"
CHALLENGE_URL = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava"
CERTLOGIN_URL = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem"
SUBMIT_URL = f"{BASE_URL}/json-api/formular-schranky/110-vypis-pojistencu-reg-u-pzs/ulozit-formular"
VYPIS_URL = f"{BASE_URL}/app/schranky-vypis-pojistencu-v-kapitaci"
DOWNLOAD_URL = f"{BASE_URL}/html/prehled-zprav-ve-schrankach/zobrazit-prilohu"
# Hodnoty filtru (ověřeno odchytem reálného podání na portálu)
ICZ_INTERNAL = "933189" # IČZ 09305000 — interní ID položky "icz"
RAZENI = "jmeno" # jmeno = příjmení a jména, rc = rodná čísla
TYP = "soubor" # soubor = datový soubor, sestava = PDF sestava
# Hlavička platného výpisu pojištěnců (IČP 09305001 = MUDr. Buzalková)
HLAVICKA = "H09305001"
LOG_FILE = os.path.join(os.path.dirname(__file__), "log_podani.json")
# Sdílené soubory s RBP skriptem pro stahování zpráv
STAHUJ_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "StahováníZpráv", "213 RBP"))
COOKIES_FILE = os.path.join(STAHUJ_DIR, "rbp_cookies.json")
CHROME_PROFILE = os.path.join(STAHUJ_DIR, "chrome_profile")
DOWNLOAD_DIR = os.path.join(
get_dropbox_root(),
"Ordinace", "Dokumentace_ke_zpracování", "Zúčtovací zprávy", "SeznamyPojištěnců",
)
# ---------------------------------------------------------------------------
# Přihlášení
# ---------------------------------------------------------------------------
def prihlaseni() -> requests.Session:
"""Přihlásí se certifikátem, vrátí autentizovanou session. Uloží cookies pro Playwright."""
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"X-Requested-With": "XMLHttpRequest",
"Origin": BASE_URL,
"Referer": BASE_URL + "/",
})
r = session.get(f"{BASE_URL}/app/prihlaseni")
r.raise_for_status()
session.cookies.set("pzp_sign", "CERT", domain="portal.rbp-zp.cz", path="/")
r = session.post(CHALLENGE_URL, json={"login_sign": "CERT"},
headers={"Content-Type": "application/json; charset=UTF-8"})
r.raise_for_status()
zprava = r.json()["data"]["zprava"]
with open(PFX_PATH, "rb") as f:
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
podpis = (
pkcs7.PKCS7SignatureBuilder()
.set_data(zprava.encode("utf-8"))
.add_signer(cert, private_key, hashes.SHA256())
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature])
.decode("ascii").strip()
)
r = session.post(CERTLOGIN_URL, json={"zprava": zprava, "podpis": podpis},
headers={"Content-Type": "application/json; charset=UTF-8"})
r.raise_for_status()
data = r.json()["data"]
if not data.get("prihlasen"):
raise RuntimeError(f"Přihlášení selhalo: {r.json().get('errMsg', '')}")
print("Přihlášení úspěšné!")
cookies = [
{
"name": c.name,
"value": c.value,
"domain": c.domain if c.domain.startswith(".") else "." + c.domain,
"path": c.path or "/",
"expires": int(c.expires) if c.expires else -1,
"secure": bool(c.secure),
"httpOnly": False,
"sameSite": "Lax",
}
for c in session.cookies
]
with open(COOKIES_FILE, "w", encoding="utf-8") as f:
json.dump(cookies, f, indent=2, ensure_ascii=False)
return session
# ---------------------------------------------------------------------------
# Stahování z výpisové schránky
# ---------------------------------------------------------------------------
def safe_filename(name: str) -> str:
return re.sub(r'[\\/:*?"<>|]', "_", name).strip()
def parse_date(date_str: str) -> str:
try:
return datetime.strptime(date_str.strip()[:19], "%d.%m.%Y %H:%M:%S").strftime("%Y-%m-%d")
except Exception:
try:
return datetime.strptime(date_str.strip()[:10], "%d.%m.%Y").strftime("%Y-%m-%d")
except Exception:
return "0000-00-00"
def parse_row(cells: list) -> dict:
"""Z buněk řádku schránky vytvoří popis a cílový název souboru."""
date_raw = cells[1].strip() if len(cells) > 1 else ""
desc_raw = cells[2].strip() if len(cells) > 2 else ""
fname_raw = cells[3].strip() if len(cells) > 3 else ""
desc_lines = [l.strip() for l in desc_raw.split("\n") if l.strip()]
if len(desc_lines) >= 3:
description = desc_lines[2]
elif len(desc_lines) >= 2:
description = desc_lines[1]
else:
description = desc_lines[0] if desc_lines else ""
description = description[:80]
fname_match = re.match(r'^(.+?)\s*\(\d{2}\.\d{2}\.\d{4}\)\s*$', fname_raw)
original = fname_match.group(1).strip() if fname_match else fname_raw.split("(")[0].strip()
orig_path = Path(original)
stem = orig_path.stem or "zprava"
ext = orig_path.suffix or ""
date_iso = parse_date(date_raw)
name = f"{date_iso} {safe_filename(description)} ({safe_filename(stem)}){ext}"
if len(name) > 240:
name = f"{date_iso} ({safe_filename(stem)}){ext}"
return {"date": date_iso, "desc": description, "original": original, "filename": name}
def stahni_nove_vypisy() -> int:
"""Stáhne nové výpisy z výpisové schránky. Vrátí počet stažených souborů."""
try:
from playwright.sync_api import sync_playwright
except ImportError:
print("Chybí playwright: pip install playwright && playwright install chrome")
return 0
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
with open(COOKIES_FILE, encoding="utf-8") as f:
cookies = json.load(f)
downloaded = 0
with sync_playwright() as p:
context = p.chromium.launch_persistent_context(
user_data_dir=CHROME_PROFILE,
channel="chrome",
headless=False,
slow_mo=100,
ignore_https_errors=True,
)
try:
context.add_cookies(cookies)
page = context.new_page()
page.goto(f"{VYPIS_URL}/", wait_until="domcontentloaded", timeout=30_000)
if "prihlaseni" in page.url or "login" in page.url.lower():
print("Session v prohlížeči expirovala — stahování přeskočeno")
return 0
print("Prohlížeč přihlášen OK\n")
already = set(os.listdir(DOWNLOAD_DIR))
print(f"V archivu: {len(already)} souborů.\n")
page_num = 1
seen_ids: set = set()
while True:
url = f"{VYPIS_URL}/stranka-{page_num}"
print(f" Stránka {page_num}: {url}")
try:
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
except Exception as e:
print(f" Navigace selhala: {e}")
break
page.wait_for_load_state("networkidle", timeout=15_000)
data = page.evaluate("""() => {
const rows = [];
for (const tr of document.querySelectorAll('table tr')) {
const cells = Array.from(tr.querySelectorAll('td')).map(td => td.innerText.trim());
if (cells.length < 4) continue;
const dlLink = tr.querySelector('a[onclick*="SchrPolOpenFile"]');
if (!dlLink) continue;
const mFile = dlLink.getAttribute('onclick').match(/\\d+/);
rows.push({ cells, fileId: mFile ? mFile[0] : null });
}
return rows;
}""")
rows = [r for r in data if r["fileId"]]
if not rows:
print(f" Stránka {page_num} — žádné řádky, konec schránky.")
break
current_ids = {r["fileId"] for r in rows}
if current_ids & seen_ids:
print(f" Stránka {page_num} — opakující se obsah, konec schránky.")
break
seen_ids.update(current_ids)
print(f" Nalezeno {len(rows)} zpráv.")
stop = False
for row in rows:
info = parse_row(row["cells"])
target = os.path.join(DOWNLOAD_DIR, info["filename"])
if info["filename"] in already or os.path.exists(target):
print(f" [stop] Nalezena již stažená zpráva: {info['filename']}")
stop = True
break
dl_url = f"{DOWNLOAD_URL}?zprava_id={row['fileId']}"
try:
r = context.request.get(dl_url, headers={"Referer": VYPIS_URL}, timeout=30_000)
if not r.ok:
print(f" HTTP {r.status} příloha (id={row['fileId']})")
else:
body = r.body()
if not body[:len(HLAVICKA)].decode("ascii", errors="ignore").startswith(HLAVICKA):
print(f" přeskočeno (není výpis pojištěnců): {info['filename']}")
else:
with open(target, "wb") as fh:
fh.write(body)
print(f" OK: {info['filename']}")
already.add(info["filename"])
downloaded += 1
except Exception as e:
print(f" Chyba příloha (id={row['fileId']}): {e}")
time.sleep(1.0)
if stop:
break
page_num += 1
finally:
context.close()
return downloaded
# ---------------------------------------------------------------------------
# Sestavení XML a podpis žádosti
# ---------------------------------------------------------------------------
def build_xml(datum: date) -> str:
"""Sestaví XML žádosti o výpis pojištěnců ke dni `datum`."""
datum_str = datum.strftime("%d.%m.%Y")
return (
f'<SchrankaZadost NazevSchranky="VypisPojKap" NazevFiltru="ZZ_VYP_REG">\r\n'
f'<PolozkaFiltru Nazev="icz">{ICZ_INTERNAL}</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="datum">{datum_str}</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="razeni">{RAZENI}</PolozkaFiltru>\r\n'
f'<PolozkaFiltru Nazev="typ">{TYP}</PolozkaFiltru>\r\n'
f'</SchrankaZadost>'
)
def sign_xml(xml: str) -> str:
"""Podepíše XML certifikátem (PKCS7 detached, bez certifikátu — server cert v podpisu odmítá)."""
with open(PFX_PATH, "rb") as f:
private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD)
pem = (
pkcs7.PKCS7SignatureBuilder()
.set_data(xml.encode("utf-8"))
.add_signer(cert, private_key, hashes.SHA256())
.sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature, pkcs7.PKCS7Options.NoCerts])
.decode("ascii")
)
return pem.replace("\r\n", "\n").replace("\n", "\r\n")
def odeslat_zadost(session: requests.Session, datum: date) -> str | None:
"""Odešle podepsanou žádost o výpis ke dni `datum`. Vrátí referenční číslo nebo None."""
xml = build_xml(datum)
podpis = sign_xml(xml)
payload = {"schrXml": xml, "schrSign": podpis, "schrFiles": []}
r = session.post(SUBMIT_URL, json=payload, headers={
"Content-Type": "application/json; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Referer": BASE_URL + "/",
})
r.raise_for_status()
try:
resp = r.json()
except Exception:
print(f" Odpověď není JSON: {r.text[:300]}")
return None
resp_str = json.dumps(resp, ensure_ascii=False)
if resp.get("errMsg") or resp.get("error"):
print(f" Chyba od serveru: {resp.get('errMsg') or resp.get('error')}")
return None
m = re.search(r'\b(1[5-9]\d{7})\b', resp_str)
ref = m.group(1) if m else None
if ref:
print(f" OK — ref. číslo: {ref}")
else:
print(f" Odpověď (bez ref. čísla): {resp_str[:300]}")
return ref or ("OK" if r.ok else None)
# ---------------------------------------------------------------------------
# Log
# ---------------------------------------------------------------------------
def uloz_log(datum: date, ref_cislo: str) -> None:
log = []
if os.path.exists(LOG_FILE):
with open(LOG_FILE, encoding="utf-8") as f:
log = json.load(f)
log.append({
"ref_cislo": ref_cislo,
"datum": datum.strftime("%d.%m.%Y"),
"podano_kdy": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
})
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Hlavní funkce
# ---------------------------------------------------------------------------
def hlavni() -> None:
# 1. Přihlášení — uloží cookies pro Playwright
prihlaseni()
# 2. Stažení nových výpisů z výpisové schránky
print("\n=== Stahování nových výpisů ===")
stazeno = stahni_nove_vypisy()
print(f"Staženo: {stazeno} souborů.\n")
# 3. Znovu přihlásit — Playwright mohl invalidovat předchozí session
print("=== Znovu přihlašuji před podáním ===")
session = prihlaseni()
# 4. Podání žádosti o výpis ke dnešnímu dni
datum = date.today()
print(f"=== Podávám žádost o výpis ke dni {datum.strftime('%d.%m.%Y')} ===")
ref = odeslat_zadost(session, datum)
if ref:
uloz_log(datum, ref)
print(f"\nHotovo — žádost podána, ref: {ref}")
else:
print("\nPodání selhalo — žádost nebyla zaevidována.")
if __name__ == "__main__":
hlavni()