Přepsat stahování ICP na Playwright — certifikát přes browser context
Playwright předloží PFX certifikát automaticky při TLS handshake, klikne na odkaz a zachytí download bez nutnosti ručního přihlášení. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,18 +4,14 @@ Před importem automaticky stáhne nejnovější soubor z VZP Point (vyžaduje c
|
||||
Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7]
|
||||
"""
|
||||
|
||||
import base64
|
||||
import csv
|
||||
import glob
|
||||
import hashlib
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import sys
|
||||
import zipfile
|
||||
from datetime import date, datetime
|
||||
from html.parser import HTMLParser
|
||||
|
||||
# Windows konzole - povol UTF-8 výstup
|
||||
if sys.stdout.encoding != "utf-8":
|
||||
@@ -77,233 +73,84 @@ def parse_date(s: str) -> date | None:
|
||||
return None
|
||||
|
||||
|
||||
def _pkce_pair() -> tuple[str, str]:
|
||||
"""Vrátí (code_verifier, code_challenge) pro PKCE S256."""
|
||||
verifier = secrets.token_urlsafe(64)
|
||||
digest = hashlib.sha256(verifier.encode()).digest()
|
||||
challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
|
||||
return verifier, challenge
|
||||
|
||||
|
||||
def _get_bearer_token() -> str | None:
|
||||
"""
|
||||
Autentizuje pomocí PKCS12 certifikátu přes OIDC authorization_code + PKCE.
|
||||
Certifikát je předán při TLS handshake na auth.vzp.cz.
|
||||
Vrátí Bearer access_token nebo None při chybě.
|
||||
"""
|
||||
try:
|
||||
import requests
|
||||
from requests_pkcs12 import Pkcs12Adapter
|
||||
except ImportError:
|
||||
print("[auth] pip install requests requests-pkcs12")
|
||||
return None
|
||||
|
||||
password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None
|
||||
verifier, challenge = _pkce_pair()
|
||||
nonce = secrets.token_urlsafe(32)
|
||||
state = secrets.token_urlsafe(32)
|
||||
|
||||
session = requests.Session()
|
||||
# Certifikát pro TLS client auth na auth.vzp.cz i pro redirect na point.vzp.cz
|
||||
for base in ("https://auth.vzp.cz", "https://point.vzp.cz"):
|
||||
session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password))
|
||||
|
||||
# Krok 1: Authorize — server ověří certifikát a vrátí form_post HTML s code
|
||||
try:
|
||||
resp = session.get(
|
||||
"https://auth.vzp.cz/connect/authorize",
|
||||
params={
|
||||
"client_id": "bdesk",
|
||||
"redirect_uri": "https://point.vzp.cz/home/signin",
|
||||
"response_type": "code",
|
||||
"scope": "openid profile email phone offline_access",
|
||||
"response_mode": "form_post",
|
||||
"nonce": nonce,
|
||||
"state": state,
|
||||
"code_challenge": challenge,
|
||||
"code_challenge_method": "S256",
|
||||
},
|
||||
allow_redirects=True,
|
||||
timeout=30,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[auth] Chyba při authorize: {e}")
|
||||
return None
|
||||
|
||||
# Parsuj form_post HTML a vytáhni code
|
||||
class _FormParser(HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.fields: dict[str, str] = {}
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if tag == "input":
|
||||
d = dict(attrs)
|
||||
if d.get("name"):
|
||||
self.fields[d["name"]] = d.get("value", "")
|
||||
|
||||
fp = _FormParser()
|
||||
fp.feed(resp.text)
|
||||
code = fp.fields.get("code")
|
||||
|
||||
if not code:
|
||||
print(f"[auth] Autorizační kód nenalezen (HTTP {resp.status_code}). "
|
||||
f"Zkontroluj certifikát a heslo PFX.")
|
||||
return None
|
||||
|
||||
# Krok 2: Vyměň code za access_token
|
||||
try:
|
||||
token_resp = session.post(
|
||||
"https://auth.vzp.cz/connect/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": "https://point.vzp.cz/home/signin",
|
||||
"client_id": "bdesk",
|
||||
"code_verifier": verifier,
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
token_data = token_resp.json()
|
||||
except Exception as e:
|
||||
print(f"[auth] Chyba při výměně tokenu: {e}")
|
||||
return None
|
||||
|
||||
token = token_data.get("access_token")
|
||||
if not token:
|
||||
print(f"[auth] access_token chybí. Odpověď: {list(token_data.keys())}")
|
||||
return None
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def _get_icp_filename(session, token: str) -> str | None:
|
||||
"""
|
||||
Zjistí aktuální název ICP zip souboru z point.vzp.cz/Cms/Document
|
||||
(atribut download na odkazu 'Soubor platných IČP').
|
||||
"""
|
||||
try:
|
||||
resp = session.get("https://point.vzp.cz/Cms/Document", timeout=30)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
print(f"[stahování] Nelze načíst seznam dokumentů: {e}")
|
||||
return None
|
||||
|
||||
class _DownloadParser(HTMLParser):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.filename: str | None = None
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if tag == "a" and not self.filename:
|
||||
d = dict(attrs)
|
||||
dl = d.get("download", "")
|
||||
if re.search(r"-icp\.zip$", dl, re.IGNORECASE):
|
||||
self.filename = dl
|
||||
|
||||
dp = _DownloadParser()
|
||||
dp.feed(resp.text)
|
||||
return dp.filename
|
||||
|
||||
|
||||
def download_latest_file() -> str | None:
|
||||
"""
|
||||
1. Autentizuje certifikátem přes OIDC → Bearer token
|
||||
2. Zjistí aktuální název ICP zip z point.vzp.cz
|
||||
3. Získá SAS URL z www.vzp.cz/api/documents/{id}/files/{filename}
|
||||
4. Stáhne ZIP, rozbalí PLP111*.Lh7 do Import/
|
||||
Vrátí cestu k Lh7 nebo None při chybě.
|
||||
Použije Playwright (Chromium) s PFX certifikátem pro přihlášení na VZP Point.
|
||||
Klikne na 'Soubor platných IČP', zachytí stažený ZIP, rozbalí Lh7 do Import/.
|
||||
"""
|
||||
try:
|
||||
import requests
|
||||
from requests_pkcs12 import Pkcs12Adapter
|
||||
from playwright.sync_api import sync_playwright
|
||||
except ImportError:
|
||||
print("[stahování] pip install requests requests-pkcs12")
|
||||
print("[stahování] pip install playwright && playwright install chromium")
|
||||
return None
|
||||
|
||||
password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None
|
||||
os.makedirs(IMPORT_DIR, exist_ok=True)
|
||||
|
||||
# Session s certifikátem (pro point.vzp.cz po autentizaci)
|
||||
session = requests.Session()
|
||||
for base in ("https://auth.vzp.cz", "https://point.vzp.cz"):
|
||||
session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password))
|
||||
|
||||
# Autentizace
|
||||
print("[stahování] Přihlašování certifikátem...")
|
||||
token = _get_bearer_token()
|
||||
if not token:
|
||||
return None
|
||||
print("[stahování] Přihlášení úspěšné.")
|
||||
|
||||
# Zjisti název souboru
|
||||
zip_filename = _get_icp_filename(session, token)
|
||||
if not zip_filename:
|
||||
print("[stahování] Název ICP souboru nenalezen.")
|
||||
return None
|
||||
|
||||
# Zkontroluj, jestli Lh7 pro tento rok už máme
|
||||
year_match = re.search(r"^(\d{2})", zip_filename)
|
||||
year = year_match.group(1) if year_match else ""
|
||||
existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7"))
|
||||
if existing:
|
||||
print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.")
|
||||
return existing[0]
|
||||
|
||||
# Získej SAS URL pro stažení
|
||||
api_url = f"https://www.vzp.cz/api/documents/{VZP_DOCUMENT_ID}/files/{zip_filename}"
|
||||
print(f"[stahování] Získávám odkaz ke stažení...")
|
||||
try:
|
||||
api_resp = requests.get(
|
||||
api_url,
|
||||
headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": "https://point.vzp.cz",
|
||||
"Referer": "https://point.vzp.cz/",
|
||||
},
|
||||
timeout=30,
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
context = browser.new_context(
|
||||
client_certificates=[{
|
||||
"origin": "https://auth.vzp.cz",
|
||||
"pfxPath": VZP_CERT_FILE,
|
||||
"passphrase": VZP_CERT_PASSWORD,
|
||||
}],
|
||||
accept_downloads=True,
|
||||
)
|
||||
api_resp.raise_for_status()
|
||||
# Odpověď je SAS URL jako JSON string nebo objekt
|
||||
try:
|
||||
data = api_resp.json()
|
||||
sas_url = data if isinstance(data, str) else (
|
||||
data.get("url") or data.get("sasUrl") or data.get("uri") or data.get("value")
|
||||
)
|
||||
except Exception:
|
||||
sas_url = api_resp.text.strip().strip('"')
|
||||
except Exception as e:
|
||||
print(f"[stahování] Chyba při získávání SAS URL: {e}")
|
||||
return None
|
||||
page = context.new_page()
|
||||
|
||||
if not sas_url or not sas_url.startswith("http"):
|
||||
print(f"[stahování] Neplatná SAS URL: {str(sas_url)[:100]}")
|
||||
return None
|
||||
# 1. Přihlášení — klik na Certifikát, Playwright certifikát předloží automaticky
|
||||
print("[stahování] Přihlašování na VZP Point...")
|
||||
page.goto("https://point.vzp.cz/", wait_until="networkidle", timeout=30_000)
|
||||
|
||||
# Stáhni ZIP z Azure Blob Storage
|
||||
print(f"[stahování] Stahuji {zip_filename}...")
|
||||
cert_btn = page.locator("text=Certifikát").first
|
||||
if cert_btn.is_visible(timeout=5_000):
|
||||
cert_btn.click()
|
||||
page.wait_for_url("**/point.vzp.cz/**", timeout=30_000)
|
||||
page.wait_for_load_state("networkidle", timeout=30_000)
|
||||
|
||||
print("[stahování] Přihlášeno.")
|
||||
|
||||
# 2. Stránka s dokumenty
|
||||
page.goto("https://point.vzp.cz/Cms/Document", wait_until="networkidle", timeout=30_000)
|
||||
|
||||
# 3. Zkontroluj aktuální název souboru
|
||||
icp_link = page.locator("a[download*='-icp.zip']").first
|
||||
zip_name = icp_link.get_attribute("download", timeout=5_000)
|
||||
if zip_name:
|
||||
year = zip_name[:2]
|
||||
existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7"))
|
||||
if existing:
|
||||
print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.")
|
||||
browser.close()
|
||||
return existing[0]
|
||||
|
||||
# 4. Stáhni ZIP
|
||||
print(f"[stahování] Stahuji {zip_name or 'ICP soubor'}...")
|
||||
with page.expect_download(timeout=60_000) as dl_info:
|
||||
icp_link.click()
|
||||
download = dl_info.value
|
||||
|
||||
zip_path = os.path.join(IMPORT_DIR, download.suggested_filename)
|
||||
download.save_as(zip_path)
|
||||
browser.close()
|
||||
|
||||
# 5. Rozbal Lh7 z archivu
|
||||
try:
|
||||
zip_resp = requests.get(sas_url, timeout=60)
|
||||
zip_resp.raise_for_status()
|
||||
except Exception as e:
|
||||
print(f"[stahování] Chyba při stahování ZIP: {e}")
|
||||
return None
|
||||
|
||||
# Rozbal Lh7
|
||||
try:
|
||||
with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf:
|
||||
with zipfile.ZipFile(zip_path) as zf:
|
||||
lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")]
|
||||
if not lh7_names:
|
||||
print("[stahování] ZIP neobsahuje .Lh7 soubor")
|
||||
return None
|
||||
dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_names[0]))
|
||||
os.makedirs(IMPORT_DIR, exist_ok=True)
|
||||
with zf.open(lh7_names[0]) as src, open(dest, "wb") as out:
|
||||
out.write(src.read())
|
||||
os.remove(zip_path)
|
||||
print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)")
|
||||
return dest
|
||||
except Exception as e:
|
||||
print(f"[stahování] Chyba při rozbalování: {e}")
|
||||
return None
|
||||
|
||||
print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)")
|
||||
return dest
|
||||
|
||||
|
||||
def find_latest_file() -> str:
|
||||
files = glob.glob(os.path.join(IMPORT_DIR, "*.Lh7"))
|
||||
|
||||
Reference in New Issue
Block a user