diff --git a/StahovánízVZPWithClaude/import_vzp_pracoviste.py b/StahovánízVZPWithClaude/import_vzp_pracoviste.py index 663757d..12c2d8c 100644 --- a/StahovánízVZPWithClaude/import_vzp_pracoviste.py +++ b/StahovánízVZPWithClaude/import_vzp_pracoviste.py @@ -4,11 +4,14 @@ Před importem automaticky stáhne nejnovější soubor z VZP Point (vyžaduje c Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7] """ +import base64 import csv import glob +import hashlib import io import os import re +import secrets import sys import zipfile from datetime import date, datetime @@ -31,9 +34,9 @@ DB_CONFIG = { IMPORT_DIR = os.path.join(os.path.dirname(__file__), "Import") -VZP_POINT_DOC_URL = "https://point.vzp.cz/Cms/Document" -VZP_CERT_FILE = os.path.join(os.path.dirname(__file__), "MichalkaPublicCertProPython.pfx") -VZP_CERT_PASSWORD = "" # nastav heslo PFX souboru, pokud bylo při exportu zadáno +VZP_CERT_FILE = os.path.join(os.path.dirname(__file__), "MichalkaPublicCertProPython.pfx") +VZP_CERT_PASSWORD = "Vlado7309208104++" +VZP_DOCUMENT_ID = 5283 # "Soubor platných IČP" na point.vzp.cz/Cms/Document CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS vzp_pracoviste ( @@ -74,94 +77,228 @@ def parse_date(s: str) -> date | None: return None -def download_latest_file() -> str | None: +def _pkce_pair() -> tuple[str, str]: + """Vrátí (code_verifier, code_challenge) pro PKCE S256.""" + verifier = secrets.token_urlsafe(64) + digest = hashlib.sha256(verifier.encode()).digest() + challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() + return verifier, challenge + + +def _get_bearer_token() -> str | None: """ - Přihlásí se na VZP Point certifikátem, stáhne nejnovější ICP ZIP, - rozbalí PLP111*.Lh7 do Import/ a vrátí cestu. Při chybě vrátí None. + Autentizuje pomocí PKCS12 certifikátu přes OIDC authorization_code + PKCE. + Certifikát je předán při TLS handshake na auth.vzp.cz. + Vrátí Bearer access_token nebo None při chybě. """ try: - from requests_pkcs12 import Pkcs12Adapter import requests + from requests_pkcs12 import Pkcs12Adapter except ImportError: - print("[stahování] Chybí knihovny: pip install requests requests-pkcs12") + print("[auth] pip install requests requests-pkcs12") + return None + + password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None + verifier, challenge = _pkce_pair() + nonce = secrets.token_urlsafe(32) + state = secrets.token_urlsafe(32) + + session = requests.Session() + # Certifikát pro TLS client auth na auth.vzp.cz i pro redirect na point.vzp.cz + for base in ("https://auth.vzp.cz", "https://point.vzp.cz"): + session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password)) + + # Krok 1: Authorize — server ověří certifikát a vrátí form_post HTML s code + try: + resp = session.get( + "https://auth.vzp.cz/connect/authorize", + params={ + "client_id": "bdesk", + "redirect_uri": "https://point.vzp.cz/home/signin", + "response_type": "code", + "scope": "openid profile email phone offline_access", + "response_mode": "form_post", + "nonce": nonce, + "state": state, + "code_challenge": challenge, + "code_challenge_method": "S256", + }, + allow_redirects=True, + timeout=30, + ) + except Exception as e: + print(f"[auth] Chyba při authorize: {e}") + return None + + # Parsuj form_post HTML a vytáhni code + class _FormParser(HTMLParser): + def __init__(self): + super().__init__() + self.fields: dict[str, str] = {} + def handle_starttag(self, tag, attrs): + if tag == "input": + d = dict(attrs) + if d.get("name"): + self.fields[d["name"]] = d.get("value", "") + + fp = _FormParser() + fp.feed(resp.text) + code = fp.fields.get("code") + + if not code: + print(f"[auth] Autorizační kód nenalezen (HTTP {resp.status_code}). " + f"Zkontroluj certifikát a heslo PFX.") + return None + + # Krok 2: Vyměň code za access_token + try: + token_resp = session.post( + "https://auth.vzp.cz/connect/token", + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": "https://point.vzp.cz/home/signin", + "client_id": "bdesk", + "code_verifier": verifier, + }, + timeout=30, + ) + token_data = token_resp.json() + except Exception as e: + print(f"[auth] Chyba při výměně tokenu: {e}") + return None + + token = token_data.get("access_token") + if not token: + print(f"[auth] access_token chybí. Odpověď: {list(token_data.keys())}") + return None + + return token + + +def _get_icp_filename(session, token: str) -> str | None: + """ + Zjistí aktuální název ICP zip souboru z point.vzp.cz/Cms/Document + (atribut download na odkazu 'Soubor platných IČP'). + """ + try: + resp = session.get("https://point.vzp.cz/Cms/Document", timeout=30) + resp.raise_for_status() + except Exception as e: + print(f"[stahování] Nelze načíst seznam dokumentů: {e}") + return None + + class _DownloadParser(HTMLParser): + def __init__(self): + super().__init__() + self.filename: str | None = None + def handle_starttag(self, tag, attrs): + if tag == "a" and not self.filename: + d = dict(attrs) + dl = d.get("download", "") + if re.search(r"-icp\.zip$", dl, re.IGNORECASE): + self.filename = dl + + dp = _DownloadParser() + dp.feed(resp.text) + return dp.filename + + +def download_latest_file() -> str | None: + """ + 1. Autentizuje certifikátem přes OIDC → Bearer token + 2. Zjistí aktuální název ICP zip z point.vzp.cz + 3. Získá SAS URL z www.vzp.cz/api/documents/{id}/files/{filename} + 4. Stáhne ZIP, rozbalí PLP111*.Lh7 do Import/ + Vrátí cestu k Lh7 nebo None při chybě. + """ + try: + import requests + from requests_pkcs12 import Pkcs12Adapter + except ImportError: + print("[stahování] pip install requests requests-pkcs12") return None password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None + # Session s certifikátem (pro point.vzp.cz po autentizaci) session = requests.Session() - session.mount("https://point.vzp.cz", Pkcs12Adapter( - pkcs12_filename=VZP_CERT_FILE, - pkcs12_password=password, - )) + for base in ("https://auth.vzp.cz", "https://point.vzp.cz"): + session.mount(base, Pkcs12Adapter(pkcs12_filename=VZP_CERT_FILE, pkcs12_password=password)) - # Načti stránku s dokumenty - try: - resp = session.get(VZP_POINT_DOC_URL, timeout=30) - resp.raise_for_status() - except Exception as e: - print(f"[stahování] Chyba při načtení {VZP_POINT_DOC_URL}: {e}") + # Autentizace + print("[stahování] Přihlašování certifikátem...") + token = _get_bearer_token() + if not token: + return None + print("[stahování] Přihlášení úspěšné.") + + # Zjisti název souboru + zip_filename = _get_icp_filename(session, token) + if not zip_filename: + print("[stahování] Název ICP souboru nenalezen.") return None - # Najdi odkaz na *-icp.zip - class _LinkParser(HTMLParser): - def __init__(self): - super().__init__() - self.icp_links: list[str] = [] - - def handle_starttag(self, tag, attrs): - if tag == "a": - href = dict(attrs).get("href", "") - if re.search(r"-icp\.zip", href, re.IGNORECASE): - self.icp_links.append(href) - - parser = _LinkParser() - parser.feed(resp.text) - - if not parser.icp_links: - print("[stahování] Na stránce VZP Point nebyl nalezen odkaz na *-icp.zip") - return None - - # Vyber nejnovější dle data v názvu (YYMMDDHHMMSS-icp.zip) - zip_href = sorted(parser.icp_links)[-1] - if not zip_href.startswith("http"): - zip_href = "https://point.vzp.cz" + zip_href - - # Zkontroluj, jestli už máme aktuální soubor (podle data v názvu ZIP) - date_match = re.search(r"(\d{6})\d{6}-icp\.zip", zip_href) - zip_date = date_match.group(1) if date_match else "" # YYMMDD - lh7_name = f"PLP111{zip_date[:2]}.Lh7" if zip_date else "PLP111??.Lh7" - dest = os.path.join(IMPORT_DIR, lh7_name) - - # Najdi případný existující soubor pro stejný rok - year_suffix = zip_date[:2] if zip_date else "" - existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year_suffix}.Lh7")) + # Zkontroluj, jestli Lh7 pro tento rok už máme + year_match = re.search(r"^(\d{2})", zip_filename) + year = year_match.group(1) if year_match else "" + existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year}.Lh7")) if existing: print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.") return existing[0] - # Stáhni ZIP - print(f"[stahování] {zip_href}") + # Získej SAS URL pro stažení + api_url = f"https://www.vzp.cz/api/documents/{VZP_DOCUMENT_ID}/files/{zip_filename}" + print(f"[stahování] Získávám odkaz ke stažení...") try: - zip_resp = session.get(zip_href, timeout=60) + api_resp = requests.get( + api_url, + headers={ + "Authorization": f"Bearer {token}", + "Origin": "https://point.vzp.cz", + "Referer": "https://point.vzp.cz/", + }, + timeout=30, + ) + api_resp.raise_for_status() + # Odpověď je SAS URL jako JSON string nebo objekt + try: + data = api_resp.json() + sas_url = data if isinstance(data, str) else ( + data.get("url") or data.get("sasUrl") or data.get("uri") or data.get("value") + ) + except Exception: + sas_url = api_resp.text.strip().strip('"') + except Exception as e: + print(f"[stahování] Chyba při získávání SAS URL: {e}") + return None + + if not sas_url or not sas_url.startswith("http"): + print(f"[stahování] Neplatná SAS URL: {str(sas_url)[:100]}") + return None + + # Stáhni ZIP z Azure Blob Storage + print(f"[stahování] Stahuji {zip_filename}...") + try: + zip_resp = requests.get(sas_url, timeout=60) zip_resp.raise_for_status() except Exception as e: print(f"[stahování] Chyba při stahování ZIP: {e}") return None - # Rozbal Lh7 z archivu + # Rozbal Lh7 try: with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf: lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")] if not lh7_names: - print("[stahování] ZIP neobsahuje žádný .Lh7 soubor") + print("[stahování] ZIP neobsahuje .Lh7 soubor") return None - lh7_entry = lh7_names[0] - dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_entry)) + dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_names[0])) os.makedirs(IMPORT_DIR, exist_ok=True) - with zf.open(lh7_entry) as src, open(dest, "wb") as out: + with zf.open(lh7_names[0]) as src, open(dest, "wb") as out: out.write(src.read()) except Exception as e: - print(f"[stahování] Chyba při rozbalování ZIP: {e}") + print(f"[stahování] Chyba při rozbalování: {e}") return None print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)")