Files
2026-06-01 12:17:41 +02:00

335 lines
11 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Stahování detailů zdravotních výkonů ze szv.mzd.gov.cz.
Čte cislo_vykonu z kolekce `vykony`, stahuje detailní stránky
a ukládá do kolekce `detaily` (upsert podle cislo_vykonu).
Požadavky:
pip install requests beautifulsoup4 pymongo lxml
"""
# ── Nastavení skriptu ──────────────────────────────────────────────────────────
KOLIK = 0 # 0 = vše; jinak maximální počet výkonů ke stažení (např. 50)
FORCE = True # True = přestáhni i už stažené záznamy
WORKERS = 5 # počet paralelních vláken
# ──────────────────────────────────────────────────────────────────────────────
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient, UpdateOne, InsertOne
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
MONGO_URI = "mongodb://192.168.1.76:27017/"
MONGO_DB = "zdravotni_vykony"
COL_VYKONY = "vykony"
COL_DETAILY = "detaily"
COL_DETAILY_HISTORIE = "detaily_historie"
# Pole porovnávaná pro detekci změn (metadata vynecháme)
COMPARE_FIELDS = [
"nazev", "poznamka", "kategorie", "typ_formulare",
"omezeni_mistem", "omezeni_frekvenci", "doba_trvani",
"nepocitat_rezii", "popis", "cim_zacina", "obsah_rozsah",
"cim_konci", "podminky",
"autorska_odbornost", "dalsi_odbornost", "nositele",
"materialy", "pripravky", "pristroje", "zum", "zulp",
"body_prime", "body_osobni", "body_rezijni", "body_celkem",
]
BASE_URL = "https://szv.mzd.gov.cz"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept-Language": "cs,en;q=0.9",
}
# Mapování label → klíč pro skalární pole
SCALAR_MAP = {
"Číslo výkonu": "cislo_vykonu",
"Název": "nazev",
"Poznámka": "poznamka",
"Kategorie": "kategorie",
"Typ formuláře": "typ_formulare",
"Omezení místem": "omezeni_mistem",
"Omezení frekvencí": "omezeni_frekvenci",
"Doba trvání": "doba_trvani",
"Popis": "popis",
"Čím výkon začíná": "cim_zacina",
"Obsah a rozsah výkonu": "obsah_rozsah",
"Čím výkon končí": "cim_konci",
"Podmínky": "podminky",
}
# Mapování label → klíč pro vnořené sub-tabulky
SUB_MAP = {
"Autorská odbornost": "autorska_odbornost",
"Další odbornost": "dalsi_odbornost",
"Nositelé": "nositele",
"Materiály": "materialy",
"Přípravky": "pripravky",
"Přístroje": "pristroje",
"ZUM": "zum",
"ZULP": "zulp",
"Bodová hodnota": "bodova_hodnota",
}
def _to_float(s: str) -> float | None:
s = s.strip().replace("\xa0", "").replace(" ", "").replace(" ", "").replace(",", ".")
try:
return float(s)
except (ValueError, AttributeError):
return None
def _parse_subtable(table_el) -> list[dict]:
"""Parsuje vnořenou tabulku → seznam diktů. Přeskočí řádek Celkem."""
rows = table_el.find_all("tr")
if not rows:
return []
headers = [th.get_text(strip=True) for th in rows[0].find_all(["th", "td"])]
if not headers:
return []
records = []
for row in rows[1:]:
cells = [td.get_text(strip=True) for td in row.find_all("td")]
if not cells:
continue
if cells[0].lower().startswith("celkem"):
continue
records.append(dict(zip(headers, cells)))
return records
def parse_detail(html: str, cislo: str) -> dict:
soup = BeautifulSoup(html, "lxml")
doc: dict = {
"cislo_vykonu": cislo,
"detail_url": f"{BASE_URL}/Vykon/Detail/{cislo}/",
}
main_table = soup.select_one("table.detailTabulka")
if not main_table:
return doc
# Řádky: <th>label</th><td>hodnota</td>
for tr in main_table.find_all("tr", recursive=False):
th = tr.find("th", recursive=False)
value_td = tr.find("td", recursive=False)
if not th or not value_td:
continue
label = th.get_text(strip=True)
if label == "Nepočítat režii":
chk = value_td.find("input", {"type": "checkbox"})
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
elif label in SCALAR_MAP:
key = SCALAR_MAP[label]
doc[key] = value_td.get_text(strip=True)
elif label in SUB_MAP:
key = SUB_MAP[label]
nested = value_td.find("table")
doc[key] = _parse_subtable(nested) if nested else []
# Záloha: checkbox mimo párový řádek
if "nepocitat_rezii" not in doc:
chk = soup.find("input", {"type": "checkbox"})
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
# Číselné přetypování skalárů
if "doba_trvani" in doc:
doc["doba_trvani"] = _to_float(doc["doba_trvani"])
# Bodová hodnota: tabulka s 1 datovým řádkem → flatten na skalární pole
bh = doc.get("bodova_hodnota", [])
if bh and isinstance(bh, list) and len(bh) == 1:
row = bh[0]
doc["body_prime"] = _to_float(row.get("Přímé", ""))
doc["body_osobni"] = _to_float(row.get("Osobní", ""))
doc["body_rezijni"] = _to_float(row.get("Režijní", ""))
doc["body_celkem"] = _to_float(row.get("Celkem", ""))
return doc
def fetch_detail(cislo: str, session: requests.Session) -> dict | None:
url = f"{BASE_URL}/Vykon/Detail/{cislo}/"
try:
resp = session.get(url, headers=HEADERS, timeout=30)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding or "utf-8"
return parse_detail(resp.text, cislo)
except Exception as exc:
logger.warning(f"Chyba při stahování {cislo}: {exc}")
return None
def worker(cislo: str) -> dict | None:
"""Každý worker má vlastní session (thread-safe)."""
s = requests.Session()
result = fetch_detail(cislo, s)
time.sleep(0.1)
return result
def process_changes(
new_detaily: list[dict],
col_detaily,
col_historie,
run_at: datetime,
) -> dict:
existing = {d["cislo_vykonu"]: d for d in col_detaily.find({}, {"_id": 0})}
ops_detaily = []
ops_historie = []
stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
for detail in new_detaily:
cid = detail["cislo_vykonu"]
detail["_scraped_at"] = run_at
if cid not in existing:
detail["_platny_od"] = run_at
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}, upsert=True))
stats["novy"] += 1
else:
old = existing[cid]
changed = [f for f in COMPARE_FIELDS if old.get(f) != detail.get(f)]
if changed:
archive = dict(old)
archive["_platny_do"] = run_at
archive["_zmenena_pole"] = changed
ops_historie.append(InsertOne(archive))
detail["_platny_od"] = run_at
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}))
stats["zmenen"] += 1
else:
ops_detaily.append(UpdateOne(
{"cislo_vykonu": cid},
{"$set": {"_scraped_at": run_at}},
))
stats["nezmenen"] += 1
if ops_detaily:
col_detaily.bulk_write(ops_detaily, ordered=False)
if ops_historie:
col_historie.bulk_write(ops_historie, ordered=False)
return stats
def create_indexes(col_detaily, col_historie):
col_detaily.create_index("cislo_vykonu", unique=True)
col_detaily.create_index("_scraped_at")
col_detaily.create_index("_platny_od")
col_historie.create_index("cislo_vykonu")
col_historie.create_index("_platny_do")
col_historie.create_index("_zmenena_pole")
logger.info("Indexy vytvořeny")
def main():
logger.info("=== Spouštím stahování detailů výkonů ===")
logger.info(f"Nastavení: KOLIK={KOLIK or 'vše'}, FORCE={FORCE}, WORKERS={WORKERS}")
run_at = datetime.now(timezone.utc)
client = MongoClient(MONGO_URI)
col_vykony = client[MONGO_DB][COL_VYKONY]
col_detaily = client[MONGO_DB][COL_DETAILY]
col_historie = client[MONGO_DB][COL_DETAILY_HISTORIE]
create_indexes(col_detaily, col_historie)
vsechna_cisla = [
d["cislo_vykonu"]
for d in col_vykony.find({"_aktivni": True}, {"cislo_vykonu": 1, "_id": 0})
]
logger.info(f"Aktivních výkonů v DB: {len(vsechna_cisla)}")
if not FORCE:
uz_stazeno = {
d["cislo_vykonu"]
for d in col_detaily.find({}, {"cislo_vykonu": 1, "_id": 0})
}
cisla = [c for c in vsechna_cisla if c not in uz_stazeno]
logger.info(f"Již staženo: {len(uz_stazeno)}, zbývá: {len(cisla)}")
else:
cisla = vsechna_cisla
logger.info("FORCE=True: přestahuju vše")
if KOLIK:
cisla = cisla[:KOLIK]
logger.info(f"KOLIK={KOLIK}: omezuji na {len(cisla)} výkonů")
if not cisla:
logger.info("Nic ke stahování.")
client.close()
return
# Stahování
fetch_chyby = 0
batch: list[dict] = []
BATCH_SIZE = 100
def flush_batch():
if batch:
stats = process_changes(batch, col_detaily, col_historie, run_at)
batch.clear()
return stats
return {"novy": 0, "zmenen": 0, "nezmenen": 0}
total_stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
with ThreadPoolExecutor(max_workers=WORKERS) as pool:
futures = {pool.submit(worker, c): c for c in cisla}
done = 0
for future in as_completed(futures):
done += 1
result = future.result()
if result:
batch.append(result)
else:
fetch_chyby += 1
if len(batch) >= BATCH_SIZE:
s = flush_batch()
for k in total_stats:
total_stats[k] += s[k]
if done % 200 == 0 or done == len(cisla):
logger.info(
f"Průběh: {done}/{len(cisla)} "
f"(chyby stahování: {fetch_chyby})"
)
s = flush_batch()
for k in total_stats:
total_stats[k] += s[k]
client.close()
logger.info(
f"Výsledek: nové={total_stats['novy']}, změněné={total_stats['zmenen']}, "
f"nezměněné={total_stats['nezmenen']}, chyby={fetch_chyby}"
)
logger.info("=== Hotovo ===")
if __name__ == "__main__":
main()