diff --git a/Insurance/StahováníZpráv/201 VoZP/01_prihlaseni.py b/Insurance/StahováníZpráv/201 VoZP/01_prihlaseni.py index f2fe7b8..8e6863a 100644 --- a/Insurance/StahováníZpráv/201 VoZP/01_prihlaseni.py +++ b/Insurance/StahováníZpráv/201 VoZP/01_prihlaseni.py @@ -1,94 +1,87 @@ """ -01 - Přihlášení na portál VoZP — extrakce cookies (jako admin). +Přihlášení na portál VoZP pomocí certifikátu (bez NMSigneru). -POSTUP: - 1. Spusť: python 01_prihlaseni.py - 2. Vyskočí UAC — schval (skript potřebuje admin pro čtení Chrome cookies - kvůli App-Bound Encryption v Chrome v130+). - 3. Otevře se nové okno s běžícím skriptem (admin). - 4. Skript otevře Chrome (jako user) na VoZP login. - 5. Přihlas se certifikátem (NMSigner vyskočí — klikni ANO). - 6. Vrať se k Chrome → ZAVŘI ho (všechna okna). - 7. Stiskni Enter v admin okně skriptu. - 8. Cookies se uloží do vozp_cookies.json. +JAK TO FUNGUJE: + Portál používá challenge-response autentizaci: + 1. Skript načte přihlašovací stránku → dostane session cookie (SID) + 2. Požádá server o zprávu k podpisu (challenge s časovým razítkem) + 3. Podepíše ji certifikátem jako PKCS7/CMS detached signature (RSA + SHA-256) + 4. Odešle podpis na server → server ověří a vrátí přihlášení + 5. Výsledné cookies uloží do vozp_cookies.json pro další skripty -POŽADAVKY: - pip install rookiepy +POUŽITÍ: + pip install requests cryptography + python 01_prihlaseni.py + +CERTIFIKÁT: + Exportuj z Windows: certmgr.msc → Osobní → pravý klik → Exportovat + Formát: PKCS #12 (.PFX), bez CA řetězu + Uložit do: ../../Certificates/MBQualifiedCert.pfx """ -import ctypes import json import os -import subprocess -import sys -LOGIN_URL = "https://portal.vozp.cz/app/prihlaseni" -INBOX_URL = "https://portal.vozp.cz/app/prehled-zprav-ve-schrankach" +import requests +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.serialization import pkcs7, pkcs12 + +PFX_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../Certificates/MBQualifiedCert.pfx")) +PFX_PASSWORD = b"Vlado7309208104++" COOKIES_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), "vozp_cookies.json")) - -def is_admin() -> bool: - try: - return ctypes.windll.shell32.IsUserAnAdmin() != 0 - except Exception: - return False +BASE_URL = "https://portal.vozp.cz" +CHALLENGE_URL = f"{BASE_URL}/json-api/prihlaseni/prihlasovaci-zprava" +CERTLOGIN_URL = f"{BASE_URL}/json-api/prihlaseni/prihlaseni-certifikatem" -def relaunch_as_admin() -> None: - print("Skript potřebuje admin práva (Chrome v130+ App-Bound Encryption).") - print("Otevírám UAC...") - params = " ".join(f'"{a}"' for a in sys.argv) - ret = ctypes.windll.shell32.ShellExecuteW( - None, "runas", sys.executable, params, None, 1 +def main() -> None: + session = requests.Session() + session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "X-Requested-With": "XMLHttpRequest", + "Origin": BASE_URL, + "Referer": BASE_URL + "/", + }) + + # 1. Načti login stránku → SID cookie + r = session.get(f"{BASE_URL}/app/prihlaseni") + r.raise_for_status() + session.cookies.set("pzp_sign", "CERT", domain="portal.vozp.cz", path="/") + + # 2. Získej challenge + r = session.post(CHALLENGE_URL, json={"login_sign": "CERT"}, + headers={"Content-Type": "application/json; charset=UTF-8"}) + r.raise_for_status() + zprava = r.json()["data"]["zprava"] + + # 3. Podepis (PKCS7 detached, RSA + SHA-256, bez CA řetězu) + with open(PFX_PATH, "rb") as f: + private_key, cert, _ = pkcs12.load_key_and_certificates(f.read(), PFX_PASSWORD) + + podpis = ( + pkcs7.PKCS7SignatureBuilder() + .set_data(zprava.encode("utf-8")) + .add_signer(cert, private_key, hashes.SHA256()) + .sign(serialization.Encoding.PEM, [pkcs7.PKCS7Options.DetachedSignature]) + .decode("ascii").strip() ) - if ret <= 32: - print(f"UAC selhalo (kód {ret}). Spusť ručně jako admin.") - sys.exit(1) - sys.exit(0) + # 4. Přihlas se + r = session.post(CERTLOGIN_URL, json={"zprava": zprava, "podpis": podpis}, + headers={"Content-Type": "application/json; charset=UTF-8"}) + r.raise_for_status() + data = r.json()["data"] -def open_chrome_as_user(url: str) -> None: - """Spustí default browser jako standardní user, i když Python běží jako admin.""" - # explorer.exe běží jako user a otevře URL v defaultním prohlížeči - try: - subprocess.Popen(["explorer.exe", url]) + if not data.get("prihlasen"): + print(f"Přihlášení selhalo: {r.json()['errMsg']}") return - except Exception: - pass - # Fallback - import webbrowser - webbrowser.open(url) + print(f"Přihlášení úspěšné — {data.get('url', '')}") -def read_cookies_rookiepy() -> list[dict]: - import rookiepy - raw = rookiepy.chrome(["vozp.cz", ".vozp.cz", "portal.vozp.cz", "portalzp.cz"]) - out = [] - for c in raw: - dom = c["domain"] - if not any(d in dom for d in ["vozp.cz", "portalzp.cz"]): - continue - out.append({ - "name": c["name"], - "value": c["value"], - "domain": dom if dom.startswith(".") else "." + dom, - "path": c.get("path") or "/", - "expires": int(c["expires"]) if c.get("expires") else -1, - "secure": bool(c.get("secure", False)), - "httpOnly": bool(c.get("http_only", False)), - "sameSite": "Lax", - }) - return out - - -def read_cookies_browser_cookie3() -> list[dict]: - import browser_cookie3 - cj = browser_cookie3.chrome(domain_name="vozp.cz") - out = [] - for c in cj: - if not any(d in c.domain for d in ["vozp.cz", "portalzp.cz"]): - continue - out.append({ + # 5. Ulož cookies + cookies = [ + { "name": c.name, "value": c.value, "domain": c.domain if c.domain.startswith(".") else "." + c.domain, @@ -97,70 +90,12 @@ def read_cookies_browser_cookie3() -> list[dict]: "secure": bool(c.secure), "httpOnly": False, "sameSite": "Lax", - }) - return out - - -def main() -> None: - if not is_admin(): - relaunch_as_admin() - return # nedosažitelné - - print("=" * 60) - print("BĚŽÍM JAKO ADMIN — PŘIHLÁŠENÍ DO VoZP") - print("=" * 60) - print(f"1. Otevírám Chrome (jako user) na: {LOGIN_URL}") - print("2. Přihlas se certifikátem (NMSigner → klikni ANO)") - print("3. Po přihlášení ZAVŘI Chrome úplně (všechna okna)") - print("4. Vrať se sem a stiskni Enter") - print("=" * 60) - - open_chrome_as_user(LOGIN_URL) - - input("\nAž jsi přihlášen/a a Chrome ZAVŘENÝ, stiskni Enter...") - - cookies: list[dict] = [] - last_err = "" - - print("\nČtu cookies (jako admin)...") - try: - cookies = read_cookies_rookiepy() - print(f" rookiepy OK: {len(cookies)} cookies") - except ImportError: - last_err = "rookiepy není nainstalovaný" - print(f" {last_err} — zkouším browser_cookie3...") - except Exception as e: - last_err = f"rookiepy: {e}" - print(f" rookiepy selhalo: {e} — zkouším browser_cookie3...") - - if not cookies: - try: - cookies = read_cookies_browser_cookie3() - print(f" browser_cookie3 OK: {len(cookies)} cookies") - except ImportError: - print(" browser_cookie3 není nainstalovaný.") - except Exception as e: - print(f" browser_cookie3: {e}") - - if not cookies: - print("\nCookies se nepodařilo přečíst ani jako admin.") - print("Zkontroluj, že:") - print(" - Chrome je úplně zavřený (Task Manager → žádný chrome.exe)") - print(" - Jsi se na VoZP přihlásil/a (DB má nové záznamy)") - print(" - Máš nainstalované: pip install rookiepy browser-cookie3") - input("\nEnter pro zavření...") - sys.exit(1) - + } + for c in session.cookies + ] with open(COOKIES_FILE, "w", encoding="utf-8") as f: json.dump(cookies, f, indent=2, ensure_ascii=False) - - print(f"\nUloženo {len(cookies)} cookies do {COOKIES_FILE}") - for c in cookies: - exp = c["expires"] - tag = "PERSISTENT" if exp > 0 else "SESSION" - print(f" - {c['name'][:50]} ({c['domain']}) [{tag}]") - - input("\nHotovo. Enter pro zavření...") + print(f"Uloženo {len(cookies)} cookies → {COOKIES_FILE}") if __name__ == "__main__": diff --git a/Insurance/StahováníZpráv/201 VoZP/02_stahuj_vse.py b/Insurance/StahováníZpráv/201 VoZP/02_stahuj_vse.py index a643b1f..f64bae3 100644 --- a/Insurance/StahováníZpráv/201 VoZP/02_stahuj_vse.py +++ b/Insurance/StahováníZpráv/201 VoZP/02_stahuj_vse.py @@ -14,7 +14,6 @@ import winreg from datetime import datetime from pathlib import Path -import requests as req LOGIN_URL = "https://portal.vozp.cz/app/prihlaseni" BASE_URL = "https://portal.vozp.cz" @@ -150,41 +149,28 @@ def collect_rows(page) -> list[dict]: return [r for r in data if r["fileId"]] -def make_requests_session(context) -> req.Session: - """Vytvoří requests.Session se cookies z Playwright kontextu.""" - session = req.Session() - for c in context.cookies(): - session.cookies.set( - c["name"], c["value"], - domain=c.get("domain", "").lstrip(".") - ) - session.headers.update({ - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", - "Referer": INBOX_URL, - }) - return session - - -def download_file(session: req.Session, file_id: str, target: str) -> bool: - """Stáhne soubor přes přímý HTTP požadavek. Vrací True při úspěchu.""" +def download_file(context, file_id: str, target: str) -> bool: + """Stáhne soubor přes Playwright request context (sdílí cookies se session).""" url = f"{DOWNLOAD_URL}?zprava_id={file_id}" try: - r = session.get(url, timeout=30, stream=True) - r.raise_for_status() + r = context.request.get(url, headers={"Referer": INBOX_URL}) + if not r.ok: + print(f" HTTP {r.status} (id={file_id})") + return False with open(target, "wb") as f: - for chunk in r.iter_content(chunk_size=8192): - f.write(chunk) + f.write(r.body()) return True except Exception as e: print(f" Chyba stahování (id={file_id}): {e}") return False -def process_schránka(page, session: req.Session, segment: str, name: str, already: set) -> tuple[int, int]: +def process_schránka(page, context, segment: str, name: str, already: set) -> tuple[int, int]: """Projde všechny stránky schránky a stáhne soubory. Vrací (staženo, přeskočeno).""" downloaded = 0 skipped = 0 page_num = 1 + seen_ids: set = set() while True: url = f"{INBOX_URL}/{segment}/stranka-{page_num}" @@ -202,6 +188,13 @@ def process_schránka(page, session: req.Session, segment: str, name: str, alrea print(f" Stránka {page_num} — žádné řádky, končím schránku.") break + # Portál vrátí poslední stránku znovu místo prázdné — detekuj opakování + current_ids = {r["fileId"] for r in rows} + if current_ids & seen_ids: + print(f" Stránka {page_num} — opakující se obsah, končím schránku.") + break + seen_ids.update(current_ids) + print(f" Nalezeno {len(rows)} zpráv.") for row in rows: @@ -213,19 +206,11 @@ def process_schránka(page, session: req.Session, segment: str, name: str, alrea continue print(f" Stahuji: {info['filename']}") - if download_file(session, row["fileId"], target): + if download_file(context, row["fileId"], target): already.add(info["filename"]) downloaded += 1 time.sleep(0.3) - # Zkontroluj, jestli existuje další stránka - has_next = page.evaluate("""() => { - return !!Array.from(document.querySelectorAll('a')).find( - a => a.innerText.trim() === 'Další stránka' && !a.closest('[aria-disabled]') - ); - }""") - if not has_next: - break page_num += 1 return downloaded, skipped @@ -277,7 +262,6 @@ def main() -> None: if not ensure_logged_in(page, context): return - session = make_requests_session(context) already = set(os.listdir(DOWNLOAD_DIR)) print(f"V archivu: {len(already)} souborů.\n") @@ -286,7 +270,7 @@ def main() -> None: for segment, name in SCHRANKY.items(): print(f"\n=== Schránka: {name} ===") - dl, sk = process_schránka(page, session, segment, name, already) + dl, sk = process_schránka(page, context, segment, name, already) print(f" Schránka {name}: staženo {dl}, přeskočeno {sk}") total_dl += dl total_skip += sk