notebookvb
This commit is contained in:
@@ -16,6 +16,7 @@ POUŽITÍ:
|
||||
python 02_stahuj_vse.py
|
||||
"""
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -149,7 +150,11 @@ def process_sekce(page, context, cfg: dict, already: set) -> tuple:
|
||||
filename = f"{period} {label} {prefix}.{ext}"
|
||||
target = os.path.join(DOWNLOAD_DIR, filename)
|
||||
|
||||
if filename in already or os.path.exists(target):
|
||||
# Prefix je unikátní — soubor může mít různý suffix z Content-Disposition
|
||||
prefix_check = f"{period} {label} {prefix}"
|
||||
if any(f.startswith(prefix_check) for f in already) or glob.glob(
|
||||
os.path.join(DOWNLOAD_DIR, glob.escape(prefix_check) + "*")
|
||||
):
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
|
||||
@@ -0,0 +1,234 @@
|
||||
"""
|
||||
Stahování NOVÝCH dokumentů ze ZPMVČR — zastaví se při první již stažené zprávě v každé sekci.
|
||||
|
||||
Použij po 01_prihlaseni.py (ten uloží zpmvcr_cookies.json).
|
||||
|
||||
POUŽITÍ:
|
||||
python 03_stahuj_nove.py
|
||||
"""
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
BASE_URL = "https://eforms.zpmvcr.cz"
|
||||
ZPRAVY_URL = f"{BASE_URL}/eforms/smluvni_zdravotnicke_zarizeni/dokumenty_ke_stazeni/zuctovaci_zprava"
|
||||
AVIZA_URL = f"{BASE_URL}/eforms/smluvni_zdravotnicke_zarizeni/dokumenty_ke_stazeni/aviza"
|
||||
|
||||
COOKIES_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__), "zpmvcr_cookies.json"))
|
||||
DOWNLOAD_DIR = os.path.join(os.path.dirname(__file__), "Staženo")
|
||||
|
||||
SEKCE = [
|
||||
{
|
||||
"name": "Zúčtovací zprávy",
|
||||
"url": ZPRAVY_URL,
|
||||
"need_history": True,
|
||||
"dl_historie": "1",
|
||||
"ident_col": 1,
|
||||
"typ_col": 3,
|
||||
"obdobi_col": 5,
|
||||
"ident_prefix": "VS-",
|
||||
},
|
||||
{
|
||||
"name": "Avíza",
|
||||
"url": AVIZA_URL,
|
||||
"need_history": False,
|
||||
"dl_historie": "0",
|
||||
"ident_col": 1,
|
||||
"typ_col": 2,
|
||||
"obdobi_col": 4,
|
||||
"ident_prefix": "",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def parse_period(raw: str) -> str:
|
||||
try:
|
||||
return datetime.strptime(raw.strip()[:10], "%d.%m.%Y").strftime("%Y-%m")
|
||||
except Exception:
|
||||
cleaned = raw.strip()[:7].replace(".", "-")
|
||||
return cleaned if cleaned else "0000"
|
||||
|
||||
|
||||
def safe_filename(name: str) -> str:
|
||||
return re.sub(r'[\\/:*?"<>|]', "_", name).strip()
|
||||
|
||||
|
||||
def make_js_extract(ident_col: int, typ_col: int, obdobi_col: int) -> str:
|
||||
return f"""() => {{
|
||||
const KNOWN_EXTS = new Set(['pdf', 'xml', 'txt', 'zip', 'doc', 'xls', 'csv']);
|
||||
const docs = [];
|
||||
for (const inp of document.querySelectorAll('input[name="doc_id"]')) {{
|
||||
const docId = inp.value;
|
||||
if (!docId) continue;
|
||||
const td = inp.closest('td');
|
||||
const tr = td ? td.closest('tr') : null;
|
||||
let ident = '', typ = 'pdf', obdobi = '';
|
||||
if (tr) {{
|
||||
const tds = Array.from(tr.querySelectorAll('td')).map(t => t.innerText.trim());
|
||||
ident = tds[{ident_col}] || '';
|
||||
const t = (tds[{typ_col}] || '').toLowerCase().trim();
|
||||
typ = KNOWN_EXTS.has(t) ? t : 'pdf';
|
||||
obdobi = tds[{obdobi_col}] || '';
|
||||
}}
|
||||
docs.push({{ docId, ident, typ, obdobi }});
|
||||
}}
|
||||
return docs;
|
||||
}}"""
|
||||
|
||||
|
||||
def click_next_page(page, page_num: int) -> bool:
|
||||
next_val = str(page_num + 1)
|
||||
btn = page.locator(f"input[name='page'][value='{next_val}']")
|
||||
if btn.count() == 0:
|
||||
return False
|
||||
btn.first.click()
|
||||
page.wait_for_load_state("networkidle", timeout=20_000)
|
||||
return True
|
||||
|
||||
|
||||
def uz_stazeno(prefix_check: str, already: set) -> bool:
|
||||
return any(f.startswith(prefix_check) for f in already) or bool(
|
||||
glob.glob(os.path.join(DOWNLOAD_DIR, glob.escape(prefix_check) + "*"))
|
||||
)
|
||||
|
||||
|
||||
def process_sekce(page, context, cfg: dict, already: set) -> int:
|
||||
name = cfg["name"]
|
||||
url = cfg["url"]
|
||||
ident_pfx = cfg["ident_prefix"]
|
||||
js_extract = make_js_extract(cfg["ident_col"], cfg["typ_col"], cfg["obdobi_col"])
|
||||
downloaded = 0
|
||||
|
||||
print(f"\n=== {name} ===")
|
||||
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
|
||||
page.wait_for_load_state("networkidle", timeout=15_000)
|
||||
|
||||
if cfg["need_history"]:
|
||||
hist = page.get_by_text("Zobrazit historii dokumentů za poslední 3 roky")
|
||||
if hist.count() > 0:
|
||||
hist.first.click()
|
||||
page.wait_for_load_state("networkidle", timeout=20_000)
|
||||
|
||||
page_num = 1
|
||||
seen_ids: set = set()
|
||||
|
||||
while True:
|
||||
print(f" Strana {page_num}...")
|
||||
docs = page.evaluate(js_extract)
|
||||
docs = [d for d in docs if d["docId"] not in seen_ids]
|
||||
|
||||
if not docs:
|
||||
print(f" Strana {page_num} — žádné nové dokumenty, konec sekce.")
|
||||
break
|
||||
|
||||
seen_ids.update(d["docId"] for d in docs)
|
||||
print(f" Nalezeno {len(docs)} dokumentů")
|
||||
|
||||
stop = False
|
||||
for doc in docs:
|
||||
period = parse_period(doc["obdobi"])
|
||||
ext = doc["typ"] or "pdf"
|
||||
ident = safe_filename(doc["ident"])
|
||||
label = safe_filename(name)
|
||||
prefix = ident_pfx + ident if ident else f"id-{doc['docId']}"
|
||||
prefix_check = f"{period} {label} {prefix}"
|
||||
|
||||
if uz_stazeno(prefix_check, already):
|
||||
print(f" [stop] Nalezen již stažený dokument: {prefix_check}*")
|
||||
stop = True
|
||||
break
|
||||
|
||||
r = context.request.post(url, form={
|
||||
"bin": "1", "dl": "1", "historie": cfg["dl_historie"],
|
||||
"doc_id": doc["docId"], "save": "Stáhnout",
|
||||
}, timeout=60_000)
|
||||
|
||||
if not r.ok:
|
||||
print(f" HTTP {r.status} doc_id={doc['docId']}")
|
||||
time.sleep(1.0)
|
||||
continue
|
||||
|
||||
filename = f"{prefix_check}.{ext}"
|
||||
cd = r.headers.get("content-disposition", "")
|
||||
m = re.search(r'filename=["\']?([^"\';\r\n]+)', cd)
|
||||
if m:
|
||||
orig = m.group(1).strip()
|
||||
stem, suf = os.path.splitext(orig)
|
||||
ext2 = suf.lstrip(".").lower() or ext
|
||||
filename = f"{prefix_check} ({safe_filename(stem)}).{ext2}"
|
||||
|
||||
target = os.path.join(DOWNLOAD_DIR, filename)
|
||||
with open(target, "wb") as f:
|
||||
f.write(r.body())
|
||||
print(f" OK: {filename}")
|
||||
already.add(filename)
|
||||
downloaded += 1
|
||||
time.sleep(1.0)
|
||||
|
||||
if stop:
|
||||
break
|
||||
|
||||
if not click_next_page(page, page_num):
|
||||
break
|
||||
page_num += 1
|
||||
|
||||
return downloaded
|
||||
|
||||
|
||||
def main() -> None:
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright
|
||||
except ImportError:
|
||||
print("Chybi playwright: pip install playwright && playwright install chrome")
|
||||
sys.exit(1)
|
||||
|
||||
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
||||
|
||||
if not os.path.exists(COOKIES_FILE):
|
||||
print(f"Soubor {COOKIES_FILE} nenalezen — spust 01_prihlaseni.py")
|
||||
sys.exit(1)
|
||||
|
||||
with open(COOKIES_FILE, encoding="utf-8") as f:
|
||||
cookies = json.load(f)
|
||||
|
||||
with sync_playwright() as p:
|
||||
context = p.chromium.launch_persistent_context(
|
||||
user_data_dir=os.path.join(os.path.dirname(__file__), "chrome_profile"),
|
||||
channel="chrome",
|
||||
headless=False,
|
||||
slow_mo=100,
|
||||
ignore_https_errors=True,
|
||||
)
|
||||
try:
|
||||
context.add_cookies(cookies)
|
||||
page = context.new_page()
|
||||
|
||||
page.goto(ZPRAVY_URL, wait_until="domcontentloaded", timeout=30_000)
|
||||
page.wait_for_load_state("networkidle", timeout=15_000)
|
||||
if page.locator("input[name='pin']").count() > 0:
|
||||
print("Cookies expirovala — spust 01_prihlaseni.py")
|
||||
return
|
||||
print("Prihlaseni OK\n")
|
||||
|
||||
already = set(os.listdir(DOWNLOAD_DIR))
|
||||
print(f"V archivu: {len(already)} souboru.")
|
||||
|
||||
celkem = 0
|
||||
for cfg in SEKCE:
|
||||
dl = process_sekce(page, context, cfg, already)
|
||||
print(f" {cfg['name']}: stazeno {dl}")
|
||||
celkem += dl
|
||||
|
||||
print(f"\nHotovo. Celkem stazeno: {celkem}")
|
||||
|
||||
finally:
|
||||
context.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
Přihlásí se na portál ZPMVČR a stáhne nové zprávy.
|
||||
|
||||
Kombinuje 01_prihlaseni.py + 03_stahuj_nove.py do jednoho spuštění.
|
||||
|
||||
POUŽITÍ:
|
||||
python 04_prihlaseni_a_stahuj_nove.py
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
|
||||
DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
def run(script: str) -> None:
|
||||
result = subprocess.run(
|
||||
[sys.executable, os.path.join(DIR, script)],
|
||||
check=False,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise SystemExit(f"Skript {script} skončil s chybou (kód {result.returncode})")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print("=== Přihlášení ===")
|
||||
run("01_prihlaseni.py")
|
||||
|
||||
print("\n=== Stahování nových zpráv ===")
|
||||
run("03_stahuj_nove.py")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,7 +1,7 @@
|
||||
[
|
||||
{
|
||||
"name": "JSESSIONID",
|
||||
"value": "36137BBDC47B5AA6EBACEA28257BB821",
|
||||
"value": "C487947972DEE36DF5C80FA2F0A328CD",
|
||||
"domain": ".eforms.zpmvcr.cz",
|
||||
"path": "/eforms",
|
||||
"expires": -1,
|
||||
|
||||
Reference in New Issue
Block a user