Files
ordinaceprojekt/Insurance/StahováníZpráv/02_stahuj_vse.py
T
Vladimir Buzalka 86e641b282 notebookvb
2026-04-19 13:43:46 +02:00

299 lines
12 KiB
Python

"""
02 - Stažení VŠECH zpráv z VZP Point schránky (jednorázová akce)
Projde celý seznam (lazy-load), stáhne všechny soubory a pojmenuje je:
YYYY-MM-DD Kategorie Název (původní_název).přípona
Použití: python 02_stahuj_vse.py
"""
import json
import os
import re
import sys
import time
import winreg
from datetime import datetime
from pathlib import Path
INBOX_URL = "https://point.vzp.cz/Inbox/Message"
CHROME_PROFILE = os.path.abspath(os.path.join(os.path.dirname(__file__), "chrome_profile"))
COOKIES_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), "vzp_cookies.json"))
DOWNLOAD_DIR = os.path.join(os.path.dirname(__file__), "Staženo")
def load_cookies(context) -> int:
if not os.path.exists(COOKIES_FILE):
return 0
try:
with open(COOKIES_FILE, "r", encoding="utf-8") as f:
cookies = json.load(f)
context.add_cookies(cookies)
return len(cookies)
except Exception:
return 0
def save_cookies(context) -> int:
try:
all_cookies = context.cookies()
vzp = [c for c in all_cookies if "vzp.cz" in c.get("domain", "")]
with open(COOKIES_FILE, "w", encoding="utf-8") as f:
json.dump(vzp, f, indent=2, ensure_ascii=False)
return len(vzp)
except Exception:
return 0
print(f"Chrome profil: {CHROME_PROFILE}")
print(f"Profil existuje: {os.path.exists(CHROME_PROFILE)}")
def _delete_chrome_cert_policy() -> None:
key_path = r"SOFTWARE\Policies\Google\Chrome\AutoSelectCertificateForUrls"
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, access=winreg.KEY_SET_VALUE)
winreg.DeleteValue(key, "1")
winreg.CloseKey(key)
except Exception:
pass
def parse_date(date_str: str) -> str:
"""Převede '16. 4. 2026 09:02' na '2026-04-16'."""
try:
dt = datetime.strptime(date_str.strip(), "%d. %m. %Y %H:%M")
return dt.strftime("%Y-%m-%d")
except Exception:
return "0000-00-00"
def safe_filename(name: str) -> str:
"""Odstraní znaky nevhodné pro název souboru."""
return re.sub(r'[\\/:*?"<>|]', "_", name).strip()
def build_filename(date_str: str, category: str, title: str, original: str) -> str:
"""Sestaví název souboru: YYYY-MM-DD Kategorie Název (původní).ext"""
orig_path = Path(original)
stem = orig_path.stem
ext = orig_path.suffix # včetně tečky
name = f"{parse_date(date_str)} {safe_filename(category)} {safe_filename(title)} ({safe_filename(stem)}){ext}"
# Windows limit 255 znaků
if len(name) > 240:
name = name[:230] + f"({safe_filename(stem)}){ext}"
return name
def load_all_messages(page, max_clicks: int = 0) -> None:
"""Opakovaně kliká na 'Načíst další záznamy' (přes JS). max_clicks=0 = bez omezení."""
clicks = 0
while True:
if max_clicks and clicks >= max_clicks:
break
# Hledáme tlačítko přes JS — robustnější než Playwright selektor
before_count = page.evaluate("document.querySelectorAll('.InboxMessage').length")
clicked = page.evaluate("""() => {
const btn = Array.from(document.querySelectorAll('a')).find(a => a.innerText.includes('Načíst další'));
if (btn) { btn.scrollIntoView(); btn.click(); return true; }
return false;
}""")
if not clicked:
break
clicks += 1
# Počkáme až se načtou nové zprávy (counter se zvýší)
try:
page.wait_for_function(
f"document.querySelectorAll('.InboxMessage').length > {before_count}",
timeout=15_000,
)
except Exception:
print(f" [{clicks}] Nové zprávy nenačteny, končím.")
break
after_count = page.evaluate("document.querySelectorAll('.InboxMessage').length")
print(f" [{clicks}] Načteno {after_count} zpráv (přibyly {after_count - before_count})")
time.sleep(0.3)
def collect_messages(page) -> list[dict]:
"""Projde DOM přes JS a vrátí seznam zpráv se všemi potřebnými údaji."""
data = page.evaluate("""() => {
const results = [];
for (const msg of document.querySelectorAll('.InboxMessage')) {
// Kategorie: title atribut ikony v .InboxMessage-row--type
const typeIcon = msg.querySelector('.InboxMessage-row--type i[title]');
const category = typeIcon ? typeIcon.title.trim() : '';
// Název + název souboru: z title atributů na h3 a a.InboxMessage-title-link
const titleEl = msg.querySelector('h3.InboxMessage-title');
const title = titleEl ? titleEl.title.trim() : '';
const linkEl = msg.querySelector('a.InboxMessage-title-link');
const linkTitle = linkEl ? linkEl.title.trim() : ''; // "Stáhnout soubor xyz.pdf"
const original = linkTitle.split(/\\s+/).pop(); // "xyz.pdf"
// Datum: .InboxMessage-row bez dalších modifikátorů
let date = '';
for (const row of msg.querySelectorAll('.InboxMessage-row')) {
if (row.className.trim() === 'InboxMessage-row') {
date = row.innerText.trim();
break;
}
}
results.push({ title, category, date, original });
}
return results;
}""")
# Playwright locatory — každá zpráva má svůj download link uvnitř .InboxMessage
inbox_msgs = page.locator(".InboxMessage").all()
messages = []
for i, item in enumerate(data):
if i < len(inbox_msgs):
msg_el = inbox_msgs[i]
dl = msg_el.locator(".InboxMessage-row--download .i-l")
if dl.count() > 0:
item["link_locator"] = dl
item["link_type"] = "download"
else:
zobrazit = msg_el.locator(".InboxMessage-row--download .i-r-1")
item["link_locator"] = zobrazit if zobrazit.count() > 0 else None
item["link_type"] = "zobrazit" if zobrazit.count() > 0 else None
item["msg_locator"] = msg_el
else:
item["link_locator"] = None
item["link_type"] = None
messages.append(item)
return messages
def main() -> None:
try:
from playwright.sync_api import sync_playwright
except ImportError:
print("Chybí playwright: pip install playwright && playwright install chrome")
sys.exit(1)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
_delete_chrome_cert_policy()
with sync_playwright() as p:
context = p.chromium.launch_persistent_context(
user_data_dir=CHROME_PROFILE,
channel="chrome",
headless=False,
slow_mo=100,
ignore_https_errors=True,
accept_downloads=True,
args=["--force-renderer-accessibility"],
downloads_path=DOWNLOAD_DIR,
)
try:
loaded = load_cookies(context)
print(f"Cookies načtené z JSON: {loaded}")
page = context.new_page()
print("Naviguji na VZP Point schránku...")
try:
page.goto(INBOX_URL, wait_until="domcontentloaded", timeout=30_000)
except Exception as e:
print(f"Navigace: {e}")
if page.url.startswith("https://auth.vzp.cz/signin"):
print("Přihlašovací stránka — klikám na 'Certifikát'...")
cert_btn = page.locator("a, button").filter(has_text=re.compile(r"certifikát", re.I)).first
cert_btn.wait_for(state="visible", timeout=10_000)
cert_btn.click(no_wait_after=True)
print("Pokud se zobrazí dialog výběru certifikátu, vyberte ho ručně (max 60 s)...")
time.sleep(60)
page = context.new_page()
try:
page.goto(INBOX_URL, wait_until="domcontentloaded", timeout=30_000)
except Exception as e:
print(f"Navigace po auth: {e}")
if not page.url.startswith("https://point.vzp.cz"):
print(f"Přihlášení selhalo. URL: {page.url}")
return
print("Přihlášení OK. Načítám všechny zprávy (lazy-load)...")
page.wait_for_load_state("networkidle", timeout=15_000)
load_all_messages(page, max_clicks=0)
print("Sbírám seznam zpráv...")
messages = collect_messages(page)
print(f"Nalezeno {len(messages)} zpráv.")
already = set(os.listdir(DOWNLOAD_DIR))
downloaded = 0
skipped = 0
for i, msg in enumerate(messages, 1):
filename = build_filename(msg["date"], msg["category"], msg["title"], msg["original"])
target = os.path.join(DOWNLOAD_DIR, filename)
if filename in already or os.path.exists(target):
skipped += 1
continue
if msg["link_locator"] is None:
print(f"[{i}/{len(messages)}] Přeskakuji (bez odkazu): {filename}")
continue
print(f"[{i}/{len(messages)}] Stahuji: {filename}")
if msg["link_type"] == "download":
try:
with page.expect_download(timeout=30_000) as dl_info:
msg["link_locator"].dispatch_event("click")
dl_info.value.save_as(target)
already.add(filename)
downloaded += 1
time.sleep(0.3)
except Exception as e:
print(f" Chyba při stahování '{filename}': {e}")
elif msg["link_type"] == "zobrazit":
try:
msg["link_locator"].click()
# Počkej na rozbalený obsah
footer = msg["msg_locator"].locator(".InboxMessage-footer")
footer.wait_for(state="visible", timeout=10_000)
time.sleep(0.5)
# Ulož text zprávy jako .txt (bez sekce příloh)
text_el = footer.locator("div").first
text = text_el.inner_text().strip()
txt_target = Path(target).with_suffix(".txt")
txt_target.write_text(text, encoding="utf-8")
already.add(txt_target.name)
downloaded += 1
print(f" Uložen text: {txt_target.name}")
# Stáhni přílohy uvnitř zprávy
attach_links = footer.locator("a.i-l").all()
for al in attach_links:
orig = al.get_attribute("title", timeout=2_000) or ""
orig_name = orig.split()[-1] if orig else "priloha"
att_filename = build_filename(msg["date"], msg["category"], msg["title"], orig_name)
att_target = os.path.join(DOWNLOAD_DIR, att_filename)
if not os.path.exists(att_target):
with page.expect_download(timeout=30_000) as dl_info:
al.dispatch_event("click")
dl_info.value.save_as(att_target)
already.add(att_filename)
downloaded += 1
print(f" Stažena příloha: {att_filename}")
time.sleep(0.3)
except Exception as e:
print(f" Chyba při zobrazení '{filename}': {e}")
print(f"\nHotovo. Staženo: {downloaded}, přeskočeno (již existuje): {skipped}")
finally:
saved = save_cookies(context)
print(f"Uloženo {saved} VZP cookies.")
context.close()
if __name__ == "__main__":
main()