"""
Stahování detailů zdravotních výkonů ze szv.mzd.gov.cz.
Čte cislo_vykonu z kolekce `vykony`, stahuje detailní stránky
a ukládá do kolekce `detaily` (upsert podle cislo_vykonu).
Požadavky:
pip install requests beautifulsoup4 pymongo lxml
"""
# ── Nastavení skriptu ──────────────────────────────────────────────────────────
KOLIK = 0 # 0 = vše; jinak maximální počet výkonů ke stažení (např. 50)
FORCE = True # True = přestáhni i už stažené záznamy
WORKERS = 5 # počet paralelních vláken
# ──────────────────────────────────────────────────────────────────────────────
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient, UpdateOne, InsertOne
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
MONGO_URI = "mongodb://192.168.1.76:27017/"
MONGO_DB = "zdravotni_vykony"
COL_VYKONY = "vykony"
COL_DETAILY = "detaily"
COL_DETAILY_HISTORIE = "detaily_historie"
# Pole porovnávaná pro detekci změn (metadata vynecháme)
COMPARE_FIELDS = [
"nazev", "poznamka", "kategorie", "typ_formulare",
"omezeni_mistem", "omezeni_frekvenci", "doba_trvani",
"nepocitat_rezii", "popis", "cim_zacina", "obsah_rozsah",
"cim_konci", "podminky",
"autorska_odbornost", "dalsi_odbornost", "nositele",
"materialy", "pripravky", "pristroje", "zum", "zulp",
"body_prime", "body_osobni", "body_rezijni", "body_celkem",
]
BASE_URL = "https://szv.mzd.gov.cz"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept-Language": "cs,en;q=0.9",
}
# Mapování label → klíč pro skalární pole
SCALAR_MAP = {
"Číslo výkonu": "cislo_vykonu",
"Název": "nazev",
"Poznámka": "poznamka",
"Kategorie": "kategorie",
"Typ formuláře": "typ_formulare",
"Omezení místem": "omezeni_mistem",
"Omezení frekvencí": "omezeni_frekvenci",
"Doba trvání": "doba_trvani",
"Popis": "popis",
"Čím výkon začíná": "cim_zacina",
"Obsah a rozsah výkonu": "obsah_rozsah",
"Čím výkon končí": "cim_konci",
"Podmínky": "podminky",
}
# Mapování label → klíč pro vnořené sub-tabulky
SUB_MAP = {
"Autorská odbornost": "autorska_odbornost",
"Další odbornost": "dalsi_odbornost",
"Nositelé": "nositele",
"Materiály": "materialy",
"Přípravky": "pripravky",
"Přístroje": "pristroje",
"ZUM": "zum",
"ZULP": "zulp",
"Bodová hodnota": "bodova_hodnota",
}
def _to_float(s: str) -> float | None:
s = s.strip().replace("\xa0", "").replace(" ", "").replace(" ", "").replace(",", ".")
try:
return float(s)
except (ValueError, AttributeError):
return None
def _parse_subtable(table_el) -> list[dict]:
"""Parsuje vnořenou tabulku → seznam diktů. Přeskočí řádek Celkem."""
rows = table_el.find_all("tr")
if not rows:
return []
headers = [th.get_text(strip=True) for th in rows[0].find_all(["th", "td"])]
if not headers:
return []
records = []
for row in rows[1:]:
cells = [td.get_text(strip=True) for td in row.find_all("td")]
if not cells:
continue
if cells[0].lower().startswith("celkem"):
continue
records.append(dict(zip(headers, cells)))
return records
def parse_detail(html: str, cislo: str) -> dict:
soup = BeautifulSoup(html, "lxml")
doc: dict = {
"cislo_vykonu": cislo,
"detail_url": f"{BASE_URL}/Vykon/Detail/{cislo}/",
}
main_table = soup.select_one("table.detailTabulka")
if not main_table:
return doc
# Řádky:
label | hodnota |
for tr in main_table.find_all("tr", recursive=False):
th = tr.find("th", recursive=False)
value_td = tr.find("td", recursive=False)
if not th or not value_td:
continue
label = th.get_text(strip=True)
if label == "Nepočítat režii":
chk = value_td.find("input", {"type": "checkbox"})
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
elif label in SCALAR_MAP:
key = SCALAR_MAP[label]
doc[key] = value_td.get_text(strip=True)
elif label in SUB_MAP:
key = SUB_MAP[label]
nested = value_td.find("table")
doc[key] = _parse_subtable(nested) if nested else []
# Záloha: checkbox mimo párový řádek
if "nepocitat_rezii" not in doc:
chk = soup.find("input", {"type": "checkbox"})
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
# Číselné přetypování skalárů
if "doba_trvani" in doc:
doc["doba_trvani"] = _to_float(doc["doba_trvani"])
# Bodová hodnota: tabulka s 1 datovým řádkem → flatten na skalární pole
bh = doc.get("bodova_hodnota", [])
if bh and isinstance(bh, list) and len(bh) == 1:
row = bh[0]
doc["body_prime"] = _to_float(row.get("Přímé", ""))
doc["body_osobni"] = _to_float(row.get("Osobní", ""))
doc["body_rezijni"] = _to_float(row.get("Režijní", ""))
doc["body_celkem"] = _to_float(row.get("Celkem", ""))
return doc
def fetch_detail(cislo: str, session: requests.Session) -> dict | None:
url = f"{BASE_URL}/Vykon/Detail/{cislo}/"
try:
resp = session.get(url, headers=HEADERS, timeout=30)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding or "utf-8"
return parse_detail(resp.text, cislo)
except Exception as exc:
logger.warning(f"Chyba při stahování {cislo}: {exc}")
return None
def worker(cislo: str) -> dict | None:
"""Každý worker má vlastní session (thread-safe)."""
s = requests.Session()
result = fetch_detail(cislo, s)
time.sleep(0.1)
return result
def process_changes(
new_detaily: list[dict],
col_detaily,
col_historie,
run_at: datetime,
) -> dict:
existing = {d["cislo_vykonu"]: d for d in col_detaily.find({}, {"_id": 0})}
ops_detaily = []
ops_historie = []
stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
for detail in new_detaily:
cid = detail["cislo_vykonu"]
detail["_scraped_at"] = run_at
if cid not in existing:
detail["_platny_od"] = run_at
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}, upsert=True))
stats["novy"] += 1
else:
old = existing[cid]
changed = [f for f in COMPARE_FIELDS if old.get(f) != detail.get(f)]
if changed:
archive = dict(old)
archive["_platny_do"] = run_at
archive["_zmenena_pole"] = changed
ops_historie.append(InsertOne(archive))
detail["_platny_od"] = run_at
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}))
stats["zmenen"] += 1
else:
ops_detaily.append(UpdateOne(
{"cislo_vykonu": cid},
{"$set": {"_scraped_at": run_at}},
))
stats["nezmenen"] += 1
if ops_detaily:
col_detaily.bulk_write(ops_detaily, ordered=False)
if ops_historie:
col_historie.bulk_write(ops_historie, ordered=False)
return stats
def create_indexes(col_detaily, col_historie):
col_detaily.create_index("cislo_vykonu", unique=True)
col_detaily.create_index("_scraped_at")
col_detaily.create_index("_platny_od")
col_historie.create_index("cislo_vykonu")
col_historie.create_index("_platny_do")
col_historie.create_index("_zmenena_pole")
logger.info("Indexy vytvořeny")
def main():
logger.info("=== Spouštím stahování detailů výkonů ===")
logger.info(f"Nastavení: KOLIK={KOLIK or 'vše'}, FORCE={FORCE}, WORKERS={WORKERS}")
run_at = datetime.now(timezone.utc)
client = MongoClient(MONGO_URI)
col_vykony = client[MONGO_DB][COL_VYKONY]
col_detaily = client[MONGO_DB][COL_DETAILY]
col_historie = client[MONGO_DB][COL_DETAILY_HISTORIE]
create_indexes(col_detaily, col_historie)
vsechna_cisla = [
d["cislo_vykonu"]
for d in col_vykony.find({"_aktivni": True}, {"cislo_vykonu": 1, "_id": 0})
]
logger.info(f"Aktivních výkonů v DB: {len(vsechna_cisla)}")
if not FORCE:
uz_stazeno = {
d["cislo_vykonu"]
for d in col_detaily.find({}, {"cislo_vykonu": 1, "_id": 0})
}
cisla = [c for c in vsechna_cisla if c not in uz_stazeno]
logger.info(f"Již staženo: {len(uz_stazeno)}, zbývá: {len(cisla)}")
else:
cisla = vsechna_cisla
logger.info("FORCE=True: přestahuju vše")
if KOLIK:
cisla = cisla[:KOLIK]
logger.info(f"KOLIK={KOLIK}: omezuji na {len(cisla)} výkonů")
if not cisla:
logger.info("Nic ke stahování.")
client.close()
return
# Stahování
fetch_chyby = 0
batch: list[dict] = []
BATCH_SIZE = 100
def flush_batch():
if batch:
stats = process_changes(batch, col_detaily, col_historie, run_at)
batch.clear()
return stats
return {"novy": 0, "zmenen": 0, "nezmenen": 0}
total_stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
with ThreadPoolExecutor(max_workers=WORKERS) as pool:
futures = {pool.submit(worker, c): c for c in cisla}
done = 0
for future in as_completed(futures):
done += 1
result = future.result()
if result:
batch.append(result)
else:
fetch_chyby += 1
if len(batch) >= BATCH_SIZE:
s = flush_batch()
for k in total_stats:
total_stats[k] += s[k]
if done % 200 == 0 or done == len(cisla):
logger.info(
f"Průběh: {done}/{len(cisla)} "
f"(chyby stahování: {fetch_chyby})"
)
s = flush_batch()
for k in total_stats:
total_stats[k] += s[k]
client.close()
logger.info(
f"Výsledek: nové={total_stats['novy']}, změněné={total_stats['zmenen']}, "
f"nezměněné={total_stats['nezmenen']}, chyby={fetch_chyby}"
)
logger.info("=== Hotovo ===")
if __name__ == "__main__":
main()