335 lines
11 KiB
Python
335 lines
11 KiB
Python
"""
|
||
Stahování detailů zdravotních výkonů ze szv.mzd.gov.cz.
|
||
|
||
Čte cislo_vykonu z kolekce `vykony`, stahuje detailní stránky
|
||
a ukládá do kolekce `detaily` (upsert podle cislo_vykonu).
|
||
|
||
Požadavky:
|
||
pip install requests beautifulsoup4 pymongo lxml
|
||
"""
|
||
|
||
# ── Nastavení skriptu ──────────────────────────────────────────────────────────
|
||
KOLIK = 0 # 0 = vše; jinak maximální počet výkonů ke stažení (např. 50)
|
||
FORCE = True # True = přestáhni i už stažené záznamy
|
||
WORKERS = 5 # počet paralelních vláken
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
import logging
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from pymongo import MongoClient, UpdateOne, InsertOne
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MONGO_URI = "mongodb://192.168.1.76:27017/"
|
||
MONGO_DB = "zdravotni_vykony"
|
||
COL_VYKONY = "vykony"
|
||
COL_DETAILY = "detaily"
|
||
COL_DETAILY_HISTORIE = "detaily_historie"
|
||
|
||
# Pole porovnávaná pro detekci změn (metadata vynecháme)
|
||
COMPARE_FIELDS = [
|
||
"nazev", "poznamka", "kategorie", "typ_formulare",
|
||
"omezeni_mistem", "omezeni_frekvenci", "doba_trvani",
|
||
"nepocitat_rezii", "popis", "cim_zacina", "obsah_rozsah",
|
||
"cim_konci", "podminky",
|
||
"autorska_odbornost", "dalsi_odbornost", "nositele",
|
||
"materialy", "pripravky", "pristroje", "zum", "zulp",
|
||
"body_prime", "body_osobni", "body_rezijni", "body_celkem",
|
||
]
|
||
|
||
BASE_URL = "https://szv.mzd.gov.cz"
|
||
|
||
HEADERS = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/120.0.0.0 Safari/537.36"
|
||
),
|
||
"Accept-Language": "cs,en;q=0.9",
|
||
}
|
||
|
||
# Mapování label → klíč pro skalární pole
|
||
SCALAR_MAP = {
|
||
"Číslo výkonu": "cislo_vykonu",
|
||
"Název": "nazev",
|
||
"Poznámka": "poznamka",
|
||
"Kategorie": "kategorie",
|
||
"Typ formuláře": "typ_formulare",
|
||
"Omezení místem": "omezeni_mistem",
|
||
"Omezení frekvencí": "omezeni_frekvenci",
|
||
"Doba trvání": "doba_trvani",
|
||
"Popis": "popis",
|
||
"Čím výkon začíná": "cim_zacina",
|
||
"Obsah a rozsah výkonu": "obsah_rozsah",
|
||
"Čím výkon končí": "cim_konci",
|
||
"Podmínky": "podminky",
|
||
}
|
||
|
||
# Mapování label → klíč pro vnořené sub-tabulky
|
||
SUB_MAP = {
|
||
"Autorská odbornost": "autorska_odbornost",
|
||
"Další odbornost": "dalsi_odbornost",
|
||
"Nositelé": "nositele",
|
||
"Materiály": "materialy",
|
||
"Přípravky": "pripravky",
|
||
"Přístroje": "pristroje",
|
||
"ZUM": "zum",
|
||
"ZULP": "zulp",
|
||
"Bodová hodnota": "bodova_hodnota",
|
||
}
|
||
|
||
|
||
def _to_float(s: str) -> float | None:
|
||
s = s.strip().replace("\xa0", "").replace(" ", "").replace(" ", "").replace(",", ".")
|
||
try:
|
||
return float(s)
|
||
except (ValueError, AttributeError):
|
||
return None
|
||
|
||
|
||
def _parse_subtable(table_el) -> list[dict]:
|
||
"""Parsuje vnořenou tabulku → seznam diktů. Přeskočí řádek Celkem."""
|
||
rows = table_el.find_all("tr")
|
||
if not rows:
|
||
return []
|
||
headers = [th.get_text(strip=True) for th in rows[0].find_all(["th", "td"])]
|
||
if not headers:
|
||
return []
|
||
records = []
|
||
for row in rows[1:]:
|
||
cells = [td.get_text(strip=True) for td in row.find_all("td")]
|
||
if not cells:
|
||
continue
|
||
if cells[0].lower().startswith("celkem"):
|
||
continue
|
||
records.append(dict(zip(headers, cells)))
|
||
return records
|
||
|
||
|
||
def parse_detail(html: str, cislo: str) -> dict:
|
||
soup = BeautifulSoup(html, "lxml")
|
||
doc: dict = {
|
||
"cislo_vykonu": cislo,
|
||
"detail_url": f"{BASE_URL}/Vykon/Detail/{cislo}/",
|
||
}
|
||
|
||
main_table = soup.select_one("table.detailTabulka")
|
||
if not main_table:
|
||
return doc
|
||
|
||
# Řádky: <th>label</th><td>hodnota</td>
|
||
for tr in main_table.find_all("tr", recursive=False):
|
||
th = tr.find("th", recursive=False)
|
||
value_td = tr.find("td", recursive=False)
|
||
if not th or not value_td:
|
||
continue
|
||
label = th.get_text(strip=True)
|
||
|
||
if label == "Nepočítat režii":
|
||
chk = value_td.find("input", {"type": "checkbox"})
|
||
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
|
||
|
||
elif label in SCALAR_MAP:
|
||
key = SCALAR_MAP[label]
|
||
doc[key] = value_td.get_text(strip=True)
|
||
|
||
elif label in SUB_MAP:
|
||
key = SUB_MAP[label]
|
||
nested = value_td.find("table")
|
||
doc[key] = _parse_subtable(nested) if nested else []
|
||
|
||
# Záloha: checkbox mimo párový řádek
|
||
if "nepocitat_rezii" not in doc:
|
||
chk = soup.find("input", {"type": "checkbox"})
|
||
doc["nepocitat_rezii"] = bool(chk and chk.has_attr("checked"))
|
||
|
||
# Číselné přetypování skalárů
|
||
if "doba_trvani" in doc:
|
||
doc["doba_trvani"] = _to_float(doc["doba_trvani"])
|
||
|
||
# Bodová hodnota: tabulka s 1 datovým řádkem → flatten na skalární pole
|
||
bh = doc.get("bodova_hodnota", [])
|
||
if bh and isinstance(bh, list) and len(bh) == 1:
|
||
row = bh[0]
|
||
doc["body_prime"] = _to_float(row.get("Přímé", ""))
|
||
doc["body_osobni"] = _to_float(row.get("Osobní", ""))
|
||
doc["body_rezijni"] = _to_float(row.get("Režijní", ""))
|
||
doc["body_celkem"] = _to_float(row.get("Celkem", ""))
|
||
|
||
return doc
|
||
|
||
|
||
def fetch_detail(cislo: str, session: requests.Session) -> dict | None:
|
||
url = f"{BASE_URL}/Vykon/Detail/{cislo}/"
|
||
try:
|
||
resp = session.get(url, headers=HEADERS, timeout=30)
|
||
resp.raise_for_status()
|
||
resp.encoding = resp.apparent_encoding or "utf-8"
|
||
return parse_detail(resp.text, cislo)
|
||
except Exception as exc:
|
||
logger.warning(f"Chyba při stahování {cislo}: {exc}")
|
||
return None
|
||
|
||
|
||
def worker(cislo: str) -> dict | None:
|
||
"""Každý worker má vlastní session (thread-safe)."""
|
||
s = requests.Session()
|
||
result = fetch_detail(cislo, s)
|
||
time.sleep(0.1)
|
||
return result
|
||
|
||
|
||
def process_changes(
|
||
new_detaily: list[dict],
|
||
col_detaily,
|
||
col_historie,
|
||
run_at: datetime,
|
||
) -> dict:
|
||
existing = {d["cislo_vykonu"]: d for d in col_detaily.find({}, {"_id": 0})}
|
||
|
||
ops_detaily = []
|
||
ops_historie = []
|
||
stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
|
||
|
||
for detail in new_detaily:
|
||
cid = detail["cislo_vykonu"]
|
||
detail["_scraped_at"] = run_at
|
||
|
||
if cid not in existing:
|
||
detail["_platny_od"] = run_at
|
||
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}, upsert=True))
|
||
stats["novy"] += 1
|
||
else:
|
||
old = existing[cid]
|
||
changed = [f for f in COMPARE_FIELDS if old.get(f) != detail.get(f)]
|
||
if changed:
|
||
archive = dict(old)
|
||
archive["_platny_do"] = run_at
|
||
archive["_zmenena_pole"] = changed
|
||
ops_historie.append(InsertOne(archive))
|
||
|
||
detail["_platny_od"] = run_at
|
||
ops_detaily.append(UpdateOne({"cislo_vykonu": cid}, {"$set": detail}))
|
||
stats["zmenen"] += 1
|
||
else:
|
||
ops_detaily.append(UpdateOne(
|
||
{"cislo_vykonu": cid},
|
||
{"$set": {"_scraped_at": run_at}},
|
||
))
|
||
stats["nezmenen"] += 1
|
||
|
||
if ops_detaily:
|
||
col_detaily.bulk_write(ops_detaily, ordered=False)
|
||
if ops_historie:
|
||
col_historie.bulk_write(ops_historie, ordered=False)
|
||
|
||
return stats
|
||
|
||
|
||
def create_indexes(col_detaily, col_historie):
|
||
col_detaily.create_index("cislo_vykonu", unique=True)
|
||
col_detaily.create_index("_scraped_at")
|
||
col_detaily.create_index("_platny_od")
|
||
|
||
col_historie.create_index("cislo_vykonu")
|
||
col_historie.create_index("_platny_do")
|
||
col_historie.create_index("_zmenena_pole")
|
||
logger.info("Indexy vytvořeny")
|
||
|
||
|
||
def main():
|
||
logger.info("=== Spouštím stahování detailů výkonů ===")
|
||
logger.info(f"Nastavení: KOLIK={KOLIK or 'vše'}, FORCE={FORCE}, WORKERS={WORKERS}")
|
||
run_at = datetime.now(timezone.utc)
|
||
|
||
client = MongoClient(MONGO_URI)
|
||
col_vykony = client[MONGO_DB][COL_VYKONY]
|
||
col_detaily = client[MONGO_DB][COL_DETAILY]
|
||
col_historie = client[MONGO_DB][COL_DETAILY_HISTORIE]
|
||
create_indexes(col_detaily, col_historie)
|
||
|
||
vsechna_cisla = [
|
||
d["cislo_vykonu"]
|
||
for d in col_vykony.find({"_aktivni": True}, {"cislo_vykonu": 1, "_id": 0})
|
||
]
|
||
logger.info(f"Aktivních výkonů v DB: {len(vsechna_cisla)}")
|
||
|
||
if not FORCE:
|
||
uz_stazeno = {
|
||
d["cislo_vykonu"]
|
||
for d in col_detaily.find({}, {"cislo_vykonu": 1, "_id": 0})
|
||
}
|
||
cisla = [c for c in vsechna_cisla if c not in uz_stazeno]
|
||
logger.info(f"Již staženo: {len(uz_stazeno)}, zbývá: {len(cisla)}")
|
||
else:
|
||
cisla = vsechna_cisla
|
||
logger.info("FORCE=True: přestahuju vše")
|
||
|
||
if KOLIK:
|
||
cisla = cisla[:KOLIK]
|
||
logger.info(f"KOLIK={KOLIK}: omezuji na {len(cisla)} výkonů")
|
||
|
||
if not cisla:
|
||
logger.info("Nic ke stahování.")
|
||
client.close()
|
||
return
|
||
|
||
# Stahování
|
||
fetch_chyby = 0
|
||
batch: list[dict] = []
|
||
BATCH_SIZE = 100
|
||
|
||
def flush_batch():
|
||
if batch:
|
||
stats = process_changes(batch, col_detaily, col_historie, run_at)
|
||
batch.clear()
|
||
return stats
|
||
return {"novy": 0, "zmenen": 0, "nezmenen": 0}
|
||
|
||
total_stats = {"novy": 0, "zmenen": 0, "nezmenen": 0}
|
||
|
||
with ThreadPoolExecutor(max_workers=WORKERS) as pool:
|
||
futures = {pool.submit(worker, c): c for c in cisla}
|
||
done = 0
|
||
for future in as_completed(futures):
|
||
done += 1
|
||
result = future.result()
|
||
|
||
if result:
|
||
batch.append(result)
|
||
else:
|
||
fetch_chyby += 1
|
||
|
||
if len(batch) >= BATCH_SIZE:
|
||
s = flush_batch()
|
||
for k in total_stats:
|
||
total_stats[k] += s[k]
|
||
|
||
if done % 200 == 0 or done == len(cisla):
|
||
logger.info(
|
||
f"Průběh: {done}/{len(cisla)} "
|
||
f"(chyby stahování: {fetch_chyby})"
|
||
)
|
||
|
||
s = flush_batch()
|
||
for k in total_stats:
|
||
total_stats[k] += s[k]
|
||
|
||
client.close()
|
||
|
||
logger.info(
|
||
f"Výsledek: nové={total_stats['novy']}, změněné={total_stats['zmenen']}, "
|
||
f"nezměněné={total_stats['nezmenen']}, chyby={fetch_chyby}"
|
||
)
|
||
logger.info("=== Hotovo ===")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|