Implementovat OIDC cert auth pro stahování ICP souboru z VZP Point

Certifikát → TLS → auth.vzp.cz → code → Bearer token → SAS URL → ZIP → Lh7

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-12 07:18:41 +02:00
parent 2365a38b69
commit d52278ed4d
@@ -4,11 +4,14 @@ Před importem automaticky stáhne nejnovější soubor z VZP Point (vyžaduje c
Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7] Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7]
""" """
import base64
import csv import csv
import glob import glob
import hashlib
import io import io
import os import os
import re import re
import secrets
import sys import sys
import zipfile import zipfile
from datetime import date, datetime from datetime import date, datetime
@@ -31,9 +34,9 @@ DB_CONFIG = {
IMPORT_DIR = os.path.join(os.path.dirname(__file__), "Import") IMPORT_DIR = os.path.join(os.path.dirname(__file__), "Import")
VZP_POINT_DOC_URL = "https://point.vzp.cz/Cms/Document"
VZP_CERT_FILE = os.path.join(os.path.dirname(__file__), "MichalkaPublicCertProPython.pfx") VZP_CERT_FILE = os.path.join(os.path.dirname(__file__), "MichalkaPublicCertProPython.pfx")
VZP_CERT_PASSWORD = "" # nastav heslo PFX souboru, pokud bylo při exportu zadáno VZP_CERT_PASSWORD = "Vlado7309208104++"
VZP_DOCUMENT_ID = 5283 # "Soubor platných IČP" na point.vzp.cz/Cms/Document
CREATE_TABLE_SQL = """ CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS vzp_pracoviste ( CREATE TABLE IF NOT EXISTS vzp_pracoviste (
@@ -74,94 +77,228 @@ def parse_date(s: str) -> date | None:
return None return None
def download_latest_file() -> str | None: def _pkce_pair() -> tuple[str, str]:
"""Vrátí (code_verifier, code_challenge) pro PKCE S256."""
verifier = secrets.token_urlsafe(64)
digest = hashlib.sha256(verifier.encode()).digest()
challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
return verifier, challenge
def _get_bearer_token() -> str | None:
""" """
Přihlásí se na VZP Point certifikátem, stáhne nejnovější ICP ZIP, Autentizuje pomocí PKCS12 certifikátu přes OIDC authorization_code + PKCE.
rozbalí PLP111*.Lh7 do Import/ a vrátí cestu. Při chybě vrátí None. Certifikát je předán při TLS handshake na auth.vzp.cz.
Vrátí Bearer access_token nebo None při chybě.
""" """
try: try:
from requests_pkcs12 import Pkcs12Adapter
import requests import requests
from requests_pkcs12 import Pkcs12Adapter
except ImportError: except ImportError:
print("[stahování] Chybí knihovny: pip install requests requests-pkcs12") print("[auth] pip install requests requests-pkcs12")
return None
password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None
verifier, challenge = _pkce_pair()
nonce = secrets.token_urlsafe(32)
state = secrets.token_urlsafe(32)
session = requests.Session()
# Certifikát pro TLS client auth na auth.vzp.cz i pro redirect na point.vzp.cz
for base in ("https://auth.vzp.cz", "https://point.vzp.cz"):
session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password))
# Krok 1: Authorize — server ověří certifikát a vrátí form_post HTML s code
try:
resp = session.get(
"https://auth.vzp.cz/connect/authorize",
params={
"client_id": "bdesk",
"redirect_uri": "https://point.vzp.cz/home/signin",
"response_type": "code",
"scope": "openid profile email phone offline_access",
"response_mode": "form_post",
"nonce": nonce,
"state": state,
"code_challenge": challenge,
"code_challenge_method": "S256",
},
allow_redirects=True,
timeout=30,
)
except Exception as e:
print(f"[auth] Chyba při authorize: {e}")
return None
# Parsuj form_post HTML a vytáhni code
class _FormParser(HTMLParser):
def __init__(self):
super().__init__()
self.fields: dict[str, str] = {}
def handle_starttag(self, tag, attrs):
if tag == "input":
d = dict(attrs)
if d.get("name"):
self.fields[d["name"]] = d.get("value", "")
fp = _FormParser()
fp.feed(resp.text)
code = fp.fields.get("code")
if not code:
print(f"[auth] Autorizační kód nenalezen (HTTP {resp.status_code}). "
f"Zkontroluj certifikát a heslo PFX.")
return None
# Krok 2: Vyměň code za access_token
try:
token_resp = session.post(
"https://auth.vzp.cz/connect/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "https://point.vzp.cz/home/signin",
"client_id": "bdesk",
"code_verifier": verifier,
},
timeout=30,
)
token_data = token_resp.json()
except Exception as e:
print(f"[auth] Chyba při výměně tokenu: {e}")
return None
token = token_data.get("access_token")
if not token:
print(f"[auth] access_token chybí. Odpověď: {list(token_data.keys())}")
return None
return token
def _get_icp_filename(session, token: str) -> str | None:
"""
Zjistí aktuální název ICP zip souboru z point.vzp.cz/Cms/Document
(atribut download na odkazu 'Soubor platných IČP').
"""
try:
resp = session.get("https://point.vzp.cz/Cms/Document", timeout=30)
resp.raise_for_status()
except Exception as e:
print(f"[stahování] Nelze načíst seznam dokumentů: {e}")
return None
class _DownloadParser(HTMLParser):
def __init__(self):
super().__init__()
self.filename: str | None = None
def handle_starttag(self, tag, attrs):
if tag == "a" and not self.filename:
d = dict(attrs)
dl = d.get("download", "")
if re.search(r"-icp\.zip$", dl, re.IGNORECASE):
self.filename = dl
dp = _DownloadParser()
dp.feed(resp.text)
return dp.filename
def download_latest_file() -> str | None:
"""
1. Autentizuje certifikátem přes OIDC → Bearer token
2. Zjistí aktuální název ICP zip z point.vzp.cz
3. Získá SAS URL z www.vzp.cz/api/documents/{id}/files/{filename}
4. Stáhne ZIP, rozbalí PLP111*.Lh7 do Import/
Vrátí cestu k Lh7 nebo None při chybě.
"""
try:
import requests
from requests_pkcs12 import Pkcs12Adapter
except ImportError:
print("[stahování] pip install requests requests-pkcs12")
return None return None
password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None
# Session s certifikátem (pro point.vzp.cz po autentizaci)
session = requests.Session() session = requests.Session()
session.mount("https://point.vzp.cz", Pkcs12Adapter( for base in ("https://auth.vzp.cz", "https://point.vzp.cz"):
pkcs12_filename=VZP_CERT_FILE, session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password))
pkcs12_password=password,
))
# Načti stránku s dokumenty # Autentizace
try: print("[stahování] Přihlašování certifikátem...")
resp = session.get(VZP_POINT_DOC_URL, timeout=30) token = _get_bearer_token()
resp.raise_for_status() if not token:
except Exception as e: return None
print(f"[stahování] Chyba při načtení {VZP_POINT_DOC_URL}: {e}") print("[stahování] Přihlášení úspěšné.")
# Zjisti název souboru
zip_filename = _get_icp_filename(session, token)
if not zip_filename:
print("[stahování] Název ICP souboru nenalezen.")
return None return None
# Najdi odkaz na *-icp.zip # Zkontroluj, jestli Lh7 pro tento rok už máme
class _LinkParser(HTMLParser): year_match = re.search(r"^(\d{2})", zip_filename)
def __init__(self): year = year_match.group(1) if year_match else ""
super().__init__() existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7"))
self.icp_links: list[str] = []
def handle_starttag(self, tag, attrs):
if tag == "a":
href = dict(attrs).get("href", "")
if re.search(r"-icp\.zip", href, re.IGNORECASE):
self.icp_links.append(href)
parser = _LinkParser()
parser.feed(resp.text)
if not parser.icp_links:
print("[stahování] Na stránce VZP Point nebyl nalezen odkaz na *-icp.zip")
return None
# Vyber nejnovější dle data v názvu (YYMMDDHHMMSS-icp.zip)
zip_href = sorted(parser.icp_links)[-1]
if not zip_href.startswith("http"):
zip_href = "https://point.vzp.cz" + zip_href
# Zkontroluj, jestli už máme aktuální soubor (podle data v názvu ZIP)
date_match = re.search(r"(\d{6})\d{6}-icp\.zip", zip_href)
zip_date = date_match.group(1) if date_match else "" # YYMMDD
lh7_name = f"PLP111{zip_date[:2]}.Lh7" if zip_date else "PLP111??.Lh7"
dest = os.path.join(IMPORT_DIR, lh7_name)
# Najdi případný existující soubor pro stejný rok
year_suffix = zip_date[:2] if zip_date else ""
existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year_suffix}.Lh7"))
if existing: if existing:
print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.") print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.")
return existing[0] return existing[0]
# Stáhni ZIP # Získej SAS URL pro stažení
print(f"[stahování] {zip_href}") api_url = f"https://www.vzp.cz/api/documents/{VZP_DOCUMENT_ID}/files/{zip_filename}"
print(f"[stahování] Získávám odkaz ke stažení...")
try: try:
zip_resp = session.get(zip_href, timeout=60) api_resp = requests.get(
api_url,
headers={
"Authorization": f"Bearer {token}",
"Origin": "https://point.vzp.cz",
"Referer": "https://point.vzp.cz/",
},
timeout=30,
)
api_resp.raise_for_status()
# Odpověď je SAS URL jako JSON string nebo objekt
try:
data = api_resp.json()
sas_url = data if isinstance(data, str) else (
data.get("url") or data.get("sasUrl") or data.get("uri") or data.get("value")
)
except Exception:
sas_url = api_resp.text.strip().strip('"')
except Exception as e:
print(f"[stahování] Chyba při získávání SAS URL: {e}")
return None
if not sas_url or not sas_url.startswith("http"):
print(f"[stahování] Neplatná SAS URL: {str(sas_url)[:100]}")
return None
# Stáhni ZIP z Azure Blob Storage
print(f"[stahování] Stahuji {zip_filename}...")
try:
zip_resp = requests.get(sas_url, timeout=60)
zip_resp.raise_for_status() zip_resp.raise_for_status()
except Exception as e: except Exception as e:
print(f"[stahování] Chyba při stahování ZIP: {e}") print(f"[stahování] Chyba při stahování ZIP: {e}")
return None return None
# Rozbal Lh7 z archivu # Rozbal Lh7
try: try:
with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf: with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf:
lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")] lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")]
if not lh7_names: if not lh7_names:
print("[stahování] ZIP neobsahuje žádný .Lh7 soubor") print("[stahování] ZIP neobsahuje .Lh7 soubor")
return None return None
lh7_entry = lh7_names[0] dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_names[0]))
dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_entry))
os.makedirs(IMPORT_DIR, exist_ok=True) os.makedirs(IMPORT_DIR, exist_ok=True)
with zf.open(lh7_entry) as src, open(dest, "wb") as out: with zf.open(lh7_names[0]) as src, open(dest, "wb") as out:
out.write(src.read()) out.write(src.read())
except Exception as e: except Exception as e:
print(f"[stahování] Chyba při rozbalování ZIP: {e}") print(f"[stahování] Chyba při rozbalování: {e}")
return None return None
print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)") print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)")