From 068c8edbe1019cb04df7dcfa0fdd9448e18d1524 Mon Sep 17 00:00:00 2001 From: Vlado Date: Sun, 12 Apr 2026 07:34:18 +0200 Subject: [PATCH] =?UTF-8?q?P=C5=99epsat=20stahov=C3=A1n=C3=AD=20ICP=20na?= =?UTF-8?q?=20Playwright=20=E2=80=94=20certifik=C3=A1t=20p=C5=99es=20brows?= =?UTF-8?q?er=20context?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Playwright předloží PFX certifikát automaticky při TLS handshake, klikne na odkaz a zachytí download bez nutnosti ručního přihlášení. Co-Authored-By: Claude Sonnet 4.6 --- .../import_vzp_pracoviste.py | 263 ++++-------------- 1 file changed, 55 insertions(+), 208 deletions(-) diff --git a/StahovánízVZPWithClaude/import_vzp_pracoviste.py b/StahovánízVZPWithClaude/import_vzp_pracoviste.py index 12c2d8c..0d3ec74 100644 --- a/StahovánízVZPWithClaude/import_vzp_pracoviste.py +++ b/StahovánízVZPWithClaude/import_vzp_pracoviste.py @@ -4,18 +4,14 @@ Před importem automaticky stáhne nejnovější soubor z VZP Point (vyžaduje c Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7] """ -import base64 import csv import glob -import hashlib import io import os import re -import secrets import sys import zipfile from datetime import date, datetime -from html.parser import HTMLParser # Windows konzole - povol UTF-8 výstup if sys.stdout.encoding != "utf-8": @@ -77,233 +73,84 @@ def parse_date(s: str) -> date | None: return None -def _pkce_pair() -> tuple[str, str]: - """Vrátí (code_verifier, code_challenge) pro PKCE S256.""" - verifier = secrets.token_urlsafe(64) - digest = hashlib.sha256(verifier.encode()).digest() - challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() - return verifier, challenge - - -def _get_bearer_token() -> str | None: - """ - Autentizuje pomocí PKCS12 certifikátu přes OIDC authorization_code + PKCE. - Certifikát je předán při TLS handshake na auth.vzp.cz. - Vrátí Bearer access_token nebo None při chybě. - """ - try: - import requests - from requests_pkcs12 import Pkcs12Adapter - except ImportError: - print("[auth] pip install requests requests-pkcs12") - return None - - password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None - verifier, challenge = _pkce_pair() - nonce = secrets.token_urlsafe(32) - state = secrets.token_urlsafe(32) - - session = requests.Session() - # Certifikát pro TLS client auth na auth.vzp.cz i pro redirect na point.vzp.cz - for base in ("https://auth.vzp.cz", "https://point.vzp.cz"): - session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password)) - - # Krok 1: Authorize — server ověří certifikát a vrátí form_post HTML s code - try: - resp = session.get( - "https://auth.vzp.cz/connect/authorize", - params={ - "client_id": "bdesk", - "redirect_uri": "https://point.vzp.cz/home/signin", - "response_type": "code", - "scope": "openid profile email phone offline_access", - "response_mode": "form_post", - "nonce": nonce, - "state": state, - "code_challenge": challenge, - "code_challenge_method": "S256", - }, - allow_redirects=True, - timeout=30, - ) - except Exception as e: - print(f"[auth] Chyba při authorize: {e}") - return None - - # Parsuj form_post HTML a vytáhni code - class _FormParser(HTMLParser): - def __init__(self): - super().__init__() - self.fields: dict[str, str] = {} - def handle_starttag(self, tag, attrs): - if tag == "input": - d = dict(attrs) - if d.get("name"): - self.fields[d["name"]] = d.get("value", "") - - fp = _FormParser() - fp.feed(resp.text) - code = fp.fields.get("code") - - if not code: - print(f"[auth] Autorizační kód nenalezen (HTTP {resp.status_code}). " - f"Zkontroluj certifikát a heslo PFX.") - return None - - # Krok 2: Vyměň code za access_token - try: - token_resp = session.post( - "https://auth.vzp.cz/connect/token", - data={ - "grant_type": "authorization_code", - "code": code, - "redirect_uri": "https://point.vzp.cz/home/signin", - "client_id": "bdesk", - "code_verifier": verifier, - }, - timeout=30, - ) - token_data = token_resp.json() - except Exception as e: - print(f"[auth] Chyba při výměně tokenu: {e}") - return None - - token = token_data.get("access_token") - if not token: - print(f"[auth] access_token chybí. Odpověď: {list(token_data.keys())}") - return None - - return token - - -def _get_icp_filename(session, token: str) -> str | None: - """ - Zjistí aktuální název ICP zip souboru z point.vzp.cz/Cms/Document - (atribut download na odkazu 'Soubor platných IČP'). - """ - try: - resp = session.get("https://point.vzp.cz/Cms/Document", timeout=30) - resp.raise_for_status() - except Exception as e: - print(f"[stahování] Nelze načíst seznam dokumentů: {e}") - return None - - class _DownloadParser(HTMLParser): - def __init__(self): - super().__init__() - self.filename: str | None = None - def handle_starttag(self, tag, attrs): - if tag == "a" and not self.filename: - d = dict(attrs) - dl = d.get("download", "") - if re.search(r"-icp\.zip$", dl, re.IGNORECASE): - self.filename = dl - - dp = _DownloadParser() - dp.feed(resp.text) - return dp.filename - - def download_latest_file() -> str | None: """ - 1. Autentizuje certifikátem přes OIDC → Bearer token - 2. Zjistí aktuální název ICP zip z point.vzp.cz - 3. Získá SAS URL z www.vzp.cz/api/documents/{id}/files/{filename} - 4. Stáhne ZIP, rozbalí PLP111*.Lh7 do Import/ - Vrátí cestu k Lh7 nebo None při chybě. + Použije Playwright (Chromium) s PFX certifikátem pro přihlášení na VZP Point. + Klikne na 'Soubor platných IČP', zachytí stažený ZIP, rozbalí Lh7 do Import/. """ try: - import requests - from requests_pkcs12 import Pkcs12Adapter + from playwright.sync_api import sync_playwright except ImportError: - print("[stahování] pip install requests requests-pkcs12") + print("[stahování] pip install playwright && playwright install chromium") return None - password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None + os.makedirs(IMPORT_DIR, exist_ok=True) - # Session s certifikátem (pro point.vzp.cz po autentizaci) - session = requests.Session() - for base in ("https://auth.vzp.cz", "https://point.vzp.cz"): - session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password)) - - # Autentizace - print("[stahování] Přihlašování certifikátem...") - token = _get_bearer_token() - if not token: - return None - print("[stahování] Přihlášení úspěšné.") - - # Zjisti název souboru - zip_filename = _get_icp_filename(session, token) - if not zip_filename: - print("[stahování] Název ICP souboru nenalezen.") - return None - - # Zkontroluj, jestli Lh7 pro tento rok už máme - year_match = re.search(r"^(\d{2})", zip_filename) - year = year_match.group(1) if year_match else "" - existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7")) - if existing: - print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.") - return existing[0] - - # Získej SAS URL pro stažení - api_url = f"https://www.vzp.cz/api/documents/{VZP_DOCUMENT_ID}/files/{zip_filename}" - print(f"[stahování] Získávám odkaz ke stažení...") - try: - api_resp = requests.get( - api_url, - headers={ - "Authorization": f"Bearer {token}", - "Origin": "https://point.vzp.cz", - "Referer": "https://point.vzp.cz/", - }, - timeout=30, + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + client_certificates=[{ + "origin": "https://auth.vzp.cz", + "pfxPath": VZP_CERT_FILE, + "passphrase": VZP_CERT_PASSWORD, + }], + accept_downloads=True, ) - api_resp.raise_for_status() - # Odpověď je SAS URL jako JSON string nebo objekt - try: - data = api_resp.json() - sas_url = data if isinstance(data, str) else ( - data.get("url") or data.get("sasUrl") or data.get("uri") or data.get("value") - ) - except Exception: - sas_url = api_resp.text.strip().strip('"') - except Exception as e: - print(f"[stahování] Chyba při získávání SAS URL: {e}") - return None + page = context.new_page() - if not sas_url or not sas_url.startswith("http"): - print(f"[stahování] Neplatná SAS URL: {str(sas_url)[:100]}") - return None + # 1. Přihlášení — klik na Certifikát, Playwright certifikát předloží automaticky + print("[stahování] Přihlašování na VZP Point...") + page.goto("https://point.vzp.cz/", wait_until="networkidle", timeout=30_000) - # Stáhni ZIP z Azure Blob Storage - print(f"[stahování] Stahuji {zip_filename}...") + cert_btn = page.locator("text=Certifikát").first + if cert_btn.is_visible(timeout=5_000): + cert_btn.click() + page.wait_for_url("**/point.vzp.cz/**", timeout=30_000) + page.wait_for_load_state("networkidle", timeout=30_000) + + print("[stahování] Přihlášeno.") + + # 2. Stránka s dokumenty + page.goto("https://point.vzp.cz/Cms/Document", wait_until="networkidle", timeout=30_000) + + # 3. Zkontroluj aktuální název souboru + icp_link = page.locator("a[download*='-icp.zip']").first + zip_name = icp_link.get_attribute("download", timeout=5_000) + if zip_name: + year = zip_name[:2] + existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7")) + if existing: + print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.") + browser.close() + return existing[0] + + # 4. Stáhni ZIP + print(f"[stahování] Stahuji {zip_name or 'ICP soubor'}...") + with page.expect_download(timeout=60_000) as dl_info: + icp_link.click() + download = dl_info.value + + zip_path = os.path.join(IMPORT_DIR, download.suggested_filename) + download.save_as(zip_path) + browser.close() + + # 5. Rozbal Lh7 z archivu try: - zip_resp = requests.get(sas_url, timeout=60) - zip_resp.raise_for_status() - except Exception as e: - print(f"[stahování] Chyba při stahování ZIP: {e}") - return None - - # Rozbal Lh7 - try: - with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf: + with zipfile.ZipFile(zip_path) as zf: lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")] if not lh7_names: print("[stahování] ZIP neobsahuje .Lh7 soubor") return None dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_names[0])) - os.makedirs(IMPORT_DIR, exist_ok=True) with zf.open(lh7_names[0]) as src, open(dest, "wb") as out: out.write(src.read()) + os.remove(zip_path) + print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)") + return dest except Exception as e: print(f"[stahování] Chyba při rozbalování: {e}") return None - print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)") - return dest - def find_latest_file() -> str: files = glob.glob(os.path.join(IMPORT_DIR, "*.Lh7"))