""" Stahování detailů zdravotních výkonů ze szv.mzd.gov.cz. Čte cislo_vykonu z kolekce `vykony`, stahuje detailní stránky a ukládá do kolekce `detaily` (upsert podle cislo_vykonu). Požadavky: pip install requests beautifulsoup4 pymongo lxml """ # ── Nastavení skriptu ────────────────────────────────────────────────────────── KOLIK = 0 # 0 = vše; jinak maximální počet výkonů ke stažení (např. 50) FORCE = True # True = přestáhni i už stažené záznamy WORKERS = 5 # počet paralelních vláken # ────────────────────────────────────────────────────────────────────────────── import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone import requests from bs4 import BeautifulSoup from pymongo import MongoClient, UpdateOne, InsertOne logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) MONGO_URI = "mongodb://192.168.1.76:27017/" MONGO_DB = "zdravotni_vykony" COL_VYKONY = "vykony" COL_DETAILY = "detaily" COL_DETAILY_HISTORIE = "detaily_historie" # Pole porovnávaná pro detekci změn (metadata vynecháme) COMPARE_FIELDS = [ "nazev", "poznamka", "kategorie", "typ_formulare", "omezeni_mistem", "omezeni_frekvenci", "doba_trvani", "nepocitat_rezii", "popis", "cim_zacina", "obsah_rozsah", "cim_konci", "podminky", "autorska_odbornost", "dalsi_odbornost", "nositele", "materialy", "pripravky", "pristroje", "zum", "zulp", "body_prime", "body_osobni", "body_rezijni", "body_celkem", ] BASE_URL = "https://szv.mzd.gov.cz" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), "Accept-Language": "cs,en;q=0.9", } # Mapování label → klíč pro skalární pole SCALAR_MAP = { "Číslo výkonu": "cislo_vykonu", "Název": "nazev", "Poznámka": "poznamka", "Kategorie": "kategorie", "Typ formuláře": "typ_formulare", "Omezení místem": "omezeni_mistem", "Omezení frekvencí": "omezeni_frekvenci", "Doba trvání": "doba_trvani", "Popis": "popis", "Čím výkon začíná": "cim_zacina", "Obsah a rozsah výkonu": "obsah_rozsah", "Čím výkon končí": "cim_konci", "Podmínky": "podminky", } # Mapování label → klíč pro vnořené sub-tabulky SUB_MAP = { "Autorská odbornost": "autorska_odbornost", "Další odbornost": "dalsi_odbornost", "Nositelé": "nositele", "Materiály": "materialy", "Přípravky": "pripravky", "Přístroje": "pristroje", "ZUM": "zum", "ZULP": "zulp", "Bodová hodnota": "bodova_hodnota", } def _to_float(s: str) -> float | None: s = s.strip().replace("\xa0", "").replace(" ", "").replace(" ", "").replace(",", ".") try: return float(s) except (ValueError, AttributeError): return None def _parse_subtable(table_el) -> list[dict]: """Parsuje vnořenou tabulku → seznam diktů. Přeskočí řádek Celkem.""" rows = table_el.find_all("tr") if not rows: return [] headers = [th.get_text(strip=True) for th in rows[0].find_all(["th", "td"])] if not headers: return [] records = [] for row in rows[1:]: cells = [td.get_text(strip=True) for td in row.find_all("td")] if not cells: continue if cells[0].lower().startswith("celkem"): continue records.append(dict(zip(headers, cells))) return records def parse_detail(html: str, cislo: str) -> dict: soup = BeautifulSoup(html, "lxml") doc: dict = { "cislo_vykonu": cislo, "detail_url": f"{BASE_URL}/Vykon/Detail/{cislo}/", } main_table = soup.select_one("table.detailTabulka") if not main_table: return doc # Řádky: labelhodnota for tr in main_table.find_all("tr", recursive=False): th = tr.find("th", recursive=False) value_td = tr.find("td", recursive=False) if not th or not value_td: continue label = th.get_text(strip=True) if label == "Nepočítat režii": chk = value_td.find("input", {"type": "checkbox"}) doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked")) elif label in SCALAR_MAP: key = SCALAR_MAP[label] doc[key] = value_td.get_text(strip=True) elif label in SUB_MAP: key = SUB_MAP[label] nested = value_td.find("table") doc[key] = _parse_subtable(nested) if nested else [] # Záloha: checkbox mimo párový řádek if "nepocitat_rezii" not in doc: chk = soup.find("input", {"type": "checkbox"}) doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked")) # Číselné přetypování skalárů if "doba_trvani" in doc: doc["doba_trvani"] = _to_float(doc["doba_trvani"]) # Bodová hodnota: tabulka s 1 datovým řádkem → flatten na skalární pole bh = doc.get("bodova_hodnota", []) if bh and isinstance(bh, list) and len(bh) == 1: row = bh[0] doc["body_prime"] = _to_float(row.get("Přímé", "")) doc["body_osobni"] = _to_float(row.get("Osobní", "")) doc["body_rezijni"] = _to_float(row.get("Režijní", "")) doc["body_celkem"] = _to_float(row.get("Celkem", "")) return doc def fetch_detail(cislo: str, session: requests.Session) -> dict | None: url = f"{BASE_URL}/Vykon/Detail/{cislo}/" try: resp = session.get(url, headers=HEADERS, timeout=30) resp.raise_for_status() resp.encoding = resp.apparent_encoding or "utf-8" return parse_detail(resp.text, cislo) except Exception as exc: logger.warning(f"Chyba při stahování {cislo}: {exc}") return None def worker(cislo: str) -> dict | None: """Každý worker má vlastní session (thread-safe).""" s = requests.Session() result = fetch_detail(cislo, s) time.sleep(0.1) return result def process_changes( new_detaily: list[dict], col_detaily, col_historie, run_at: datetime, ) -> dict: existing = {d["cislo_vykonu"]: d for d in col_detaily.find({}, {"_id": 0})} ops_detaily = [] ops_historie = [] stats = {"novy": 0, "zmenen": 0, "nezmenen": 0} for detail in new_detaily: cid = detail["cislo_vykonu"] detail["_scraped_at"] = run_at if cid not in existing: detail["_platny_od"] = run_at ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}, upsert=True)) stats["novy"] += 1 else: old = existing[cid] changed = [f for f in COMPARE_FIELDS if old.get(f) != detail.get(f)] if changed: archive = dict(old) archive["_platny_do"] = run_at archive["_zmenena_pole"] = changed ops_historie.append(InsertOne(archive)) detail["_platny_od"] = run_at ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail})) stats["zmenen"] += 1 else: ops_detaily.append(UpdateOne( {"cislo_vykonu": cid}, {"$set": {"_scraped_at": run_at}}, )) stats["nezmenen"] += 1 if ops_detaily: col_detaily.bulk_write(ops_detaily, ordered=False) if ops_historie: col_historie.bulk_write(ops_historie, ordered=False) return stats def create_indexes(col_detaily, col_historie): col_detaily.create_index("cislo_vykonu", unique=True) col_detaily.create_index("_scraped_at") col_detaily.create_index("_platny_od") col_historie.create_index("cislo_vykonu") col_historie.create_index("_platny_do") col_historie.create_index("_zmenena_pole") logger.info("Indexy vytvořeny") def main(): logger.info("=== Spouštím stahování detailů výkonů ===") logger.info(f"Nastavení: KOLIK={KOLIK or 'vše'}, FORCE={FORCE}, WORKERS={WORKERS}") run_at = datetime.now(timezone.utc) client = MongoClient(MONGO_URI) col_vykony = client[MONGO_DB][COL_VYKONY] col_detaily = client[MONGO_DB][COL_DETAILY] col_historie = client[MONGO_DB][COL_DETAILY_HISTORIE] create_indexes(col_detaily, col_historie) vsechna_cisla = [ d["cislo_vykonu"] for d in col_vykony.find({"_aktivni": True}, {"cislo_vykonu": 1, "_id": 0}) ] logger.info(f"Aktivních výkonů v DB: {len(vsechna_cisla)}") if not FORCE: uz_stazeno = { d["cislo_vykonu"] for d in col_detaily.find({}, {"cislo_vykonu": 1, "_id": 0}) } cisla = [c for c in vsechna_cisla if c not in uz_stazeno] logger.info(f"Již staženo: {len(uz_stazeno)}, zbývá: {len(cisla)}") else: cisla = vsechna_cisla logger.info("FORCE=True: přestahuju vše") if KOLIK: cisla = cisla[:KOLIK] logger.info(f"KOLIK={KOLIK}: omezuji na {len(cisla)} výkonů") if not cisla: logger.info("Nic ke stahování.") client.close() return # Stahování fetch_chyby = 0 batch: list[dict] = [] BATCH_SIZE = 100 def flush_batch(): if batch: stats = process_changes(batch, col_detaily, col_historie, run_at) batch.clear() return stats return {"novy": 0, "zmenen": 0, "nezmenen": 0} total_stats = {"novy": 0, "zmenen": 0, "nezmenen": 0} with ThreadPoolExecutor(max_workers=WORKERS) as pool: futures = {pool.submit(worker, c): c for c in cisla} done = 0 for future in as_completed(futures): done += 1 result = future.result() if result: batch.append(result) else: fetch_chyby += 1 if len(batch) >= BATCH_SIZE: s = flush_batch() for k in total_stats: total_stats[k] += s[k] if done % 200 == 0 or done == len(cisla): logger.info( f"Průběh: {done}/{len(cisla)} " f"(chyby stahování: {fetch_chyby})" ) s = flush_batch() for k in total_stats: total_stats[k] += s[k] client.close() logger.info( f"Výsledek: nové={total_stats['novy']}, změněné={total_stats['zmenen']}, " f"nezměněné={total_stats['nezmenen']}, chyby={fetch_chyby}" ) logger.info("=== Hotovo ===") if __name__ == "__main__": main()