Files
ordinaceprojekt/Euni/euni_stahni.py
T
2026-06-17 15:06:06 +02:00

648 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
euni_stahni.py — přihlásí se na euni.cz, projde kurzy a stáhne, co se stáhnout dá
(dokumenty: PDF/DOCX/PPTX/XLSX/ZIP a videa: Vimeo/YouTube).
Postup:
1) login přes /sign/ (formulář se parsuje, kopírují se i skrytá Nette pole)
2) sběr kurzů přes signál studyAreaList-nextPage (stránkování, dokud přibývají)
3) z každého kurzu se vytáhnou <iframe> videa a odkazy na dokumenty
(vč. /redirect/<base64>)
4) vše se stáhne do stazeno/<id>-<slug>/ (dokumenty/ a videa/)
Soukromá / nedostupná videa se samo přeskočí (nepadá).
Závislosti:
python -m pip install -U requests beautifulsoup4 python-dotenv yt-dlp static-ffmpeg
Údaje: Euni/.env -> EUNI_USERNAME=... EUNI_PASSWORD=...
Příklady:
python euni_stahni.py # vše: scrape + dokumenty + videa (profese Lékař)
python euni_stahni.py --scrape-only # jen inventura do euni_kurzy.json
python euni_stahni.py --from-json # přeskočí scrape, použije euni_kurzy.json
python euni_stahni.py --no-videos # jen dokumenty
python euni_stahni.py --professions 2,4 # více profesí (2=Lékař,4=Farmaceut,7=NLZP)
python euni_stahni.py --limit 3 # jen první 3 kurzy (test)
"""
import argparse
import base64
import hashlib
import json
import os
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import urljoin, unquote, urlparse
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# výpis ať nikdy nespadne na znaku mimo kódování konzole
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(errors="backslashreplace")
except Exception:
pass
SKRIPT_DIR = Path(__file__).resolve().parent
load_dotenv(SKRIPT_DIR / ".env")
# reuse stahovače videí z ../Video/stahni_video.py
sys.path.insert(0, str(SKRIPT_DIR.parent / "Video"))
try:
import stahni_video as sv
except Exception:
sv = None
try:
import euni_db as edb
except Exception:
edb = None
try:
import euni_seaweed as sw
except Exception:
sw = None
BASE = "https://www.euni.cz"
LOGIN_URL = f"{BASE}/sign/?bid=1"
LIST_URL = f"{BASE}/seznam-kurzu?bid=1"
NEXTPAGE = f"{BASE}/seznam-kurzu?studyAreaList-professionId={{prof}}&bid=1&do=studyAreaList-nextPage"
DOC_RE = re.compile(r"\.(pdf|docx?|pptx?|xlsx?|zip)(\?|$)", re.I)
FILE_PATH_RE = re.compile(r"fileUploader/download|files/resources", re.I)
VIDEO_RE = re.compile(r"vimeo|youtube|youtu\.be", re.I)
UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120 Safari/537.36")
# ---------------------------------------------------------------- pomocné -----
def bezpecny_nazev(s: str, max_len: int = 120) -> str:
"""Očistí řetězec na bezpečný název souboru/složky pro Windows."""
s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", s).strip(" .")
s = re.sub(r"\s+", " ", s)
return (s[:max_len].strip() or "bez_nazvu")
def make_session():
s = requests.Session()
s.headers.update({"User-Agent": UA})
return s
def _relpath(p):
"""Cesta k souboru relativně k adresáři Euni (pro uložení do DB)."""
if not p:
return None
try:
return str(Path(p).resolve().relative_to(SKRIPT_DIR))
except Exception:
return str(p)
def _seaweed_path(dest, out_root):
"""Cesta v SeaweedFS zrcadlící lokální strukturu: euni/<id-slug>/<typ>/<soubor>."""
try:
rel = Path(dest).resolve().relative_to(Path(out_root).resolve())
except Exception:
rel = Path(dest).name
return sw.PREFIX + "/" + "/".join(Path(rel).parts)
def _zaloh_do_seaweed(db, dest, out_root, kurz_id, klic):
"""Nahraje soubor do SeaweedFS a uloží referenci (fid) k materiálu do Mongo."""
if sw is None or not dest or not Path(dest).exists():
return None
remote = _seaweed_path(dest, out_root)
try:
meta = sw.entry_meta(remote)
if meta and meta.get("FileSize") == Path(dest).stat().st_size:
# už tam je se stejnou velikostí — jen zaznamenat referenci
info = {"path": remote,
"fids": [c.get("file_id") for c in (meta.get("chunks") or [])
if c.get("file_id")],
"size": meta.get("FileSize"), "md5": meta.get("Md5")}
else:
info = sw.upload(str(dest), remote)
if db is not None:
edb.set_seaweed(db, kurz_id, klic, info["path"],
fids=info.get("fids"), md5=info.get("md5"),
size=info.get("size"))
return info
except Exception as e:
print(f" [SEAWEED-CHYBA] {remote} ({str(e)[:60]})")
return None
# ----------------------------------------------------------------- login ------
def login(s):
r = s.get(LOGIN_URL, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
form = next((f for f in soup.find_all("form")
if f.find("input", {"type": "password"})), None)
if not form:
raise RuntimeError("Přihlašovací formulář nenalezen.")
data, user_field, pass_field = {}, None, None
for inp in form.find_all("input"):
name = inp.get("name")
if not name:
continue
itype = (inp.get("type") or "text").lower()
data[name] = inp.get("value", "") # zachová skrytá pole (_do, _token...)
if itype == "password":
pass_field = name
elif itype in ("text", "email") and user_field is None:
user_field = name
user = os.environ.get("EUNI_USERNAME")
pwd = os.environ.get("EUNI_PASSWORD")
if not user or not pwd:
sys.exit("Chybí EUNI_USERNAME / EUNI_PASSWORD. Vyplň je v Euni/.env "
"(vzor je v .env.example).")
data[user_field] = user
data[pass_field] = pwd
action = urljoin(LOGIN_URL, form.get("action") or LOGIN_URL)
r = s.post(action, data=data, headers={"Referer": LOGIN_URL}, timeout=30)
r.raise_for_status()
if "Odhlásit" not in r.text and "odhlasit" not in r.text.lower():
raise RuntimeError("Přihlášení se nezdařilo zkontroluj údaje v .env.")
print("✓ Přihlášeno")
# ------------------------------------------------------------- seznam kurzů ----
def get_courses_for_profession(s, profession_id):
# inicializace stránkování pro danou profesi
s.get(f"{BASE}/seznam-kurzu?studyAreaList-professionId={profession_id}&bid=1",
timeout=30)
seen, prev, guard = {}, -1, 0
while guard < 200:
guard += 1
r = s.get(NEXTPAGE.format(prof=profession_id),
headers={"X-Requested-With": "XMLHttpRequest"}, timeout=30)
r.raise_for_status()
try:
snippet = r.json().get("snippets", {}).get(
"snippet-studyAreaList-areaList", "")
except ValueError:
break
if not snippet:
break
soup = BeautifulSoup(snippet, "html.parser")
for a in soup.select("a.workshop"):
href = (a.get("href") or "").split("?")[0]
m = re.match(r"/lecture/(\d+)-(.+)", href)
if m:
seen[m.group(1)] = {
"id": m.group(1),
"slug": m.group(2),
"title": (a.find("h3").get_text(strip=True)
if a.find("h3") else m.group(2)),
"url": urljoin(BASE, href),
"profession": profession_id,
}
if len(seen) == prev:
break
prev = len(seen)
time.sleep(0.25)
return list(seen.values())
def get_all_courses(s, professions):
vse = {}
for prof in professions:
kurzy = get_courses_for_profession(s, prof)
print(f" profese {prof}: {len(kurzy)} kurzů")
for k in kurzy:
vse.setdefault(k["id"], k)
return list(vse.values())
# --------------------------------------------------------- extrakce odkazů ----
def decode_redirect(href):
m = re.search(r"/redirect/([A-Za-z0-9+/=]+)", href)
if m:
try:
return base64.b64decode(m.group(1)).decode("utf-8", "ignore")
except Exception:
pass
return None
def watch_url(embed):
m = re.search(r"player\.vimeo\.com/video/(\d+)", embed)
if m:
return f"https://vimeo.com/{m.group(1)}"
m = re.search(r"youtube\.com/embed/([\w-]+)", embed)
if m:
return f"https://www.youtube.com/watch?v={m.group(1)}"
return embed
def _text(el):
return " ".join(el.get_text(" ", strip=True).split()) if el else None
def _parse_date(s):
m = re.search(r"(\d{1,2})\.\s*(\d{1,2})\.\s*(\d{4})", s or "")
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except ValueError:
return None
return None
def _mark_for_label(soup, label_text):
"""Najde hodnotu (lecture-info-mark/bold) ve stejném containeru jako daný label."""
for lab in soup.select(".lecture-info-label"):
if label_text.lower() in lab.get_text(strip=True).lower():
par = lab.parent
mark = (par.select_one(".lecture-info-mark")
or par.select_one(".lecture-info-bold"))
if mark:
return _text(mark)
return None
def extract_course_meta(soup):
meta = {}
autor_el = soup.select_one(".lecture-info-column-author")
if autor_el:
meta["autor"] = _text(autor_el.select_one(".lecture-info-mark"))
href = autor_el.get("href") or ""
if "vimeo" in href or "youtube" in href:
meta["autor_medailonek_url"] = href
if not meta.get("autor"):
meta["autor"] = (_mark_for_label(soup, "Autor kurzu")
or _mark_for_label(soup, "Autorka kurzu"))
meta["datum_publikace"] = _parse_date(_mark_for_label(soup, "Datum publikace"))
meta["revidovano"] = _parse_date(_mark_for_label(soup, "Revidováno"))
meta["akreditace"] = _mark_for_label(soup, "Akreditace")
m = re.search(r"(\d+)\s*kredit", soup.get_text(" "), re.I)
meta["kredity"] = int(m.group(1)) if m else None
return meta
def material_klic(druh, item):
"""Vrátí (klic, platforma) pro deduplikaci materiálu."""
if druh == "video":
e = item["embed"]
m = re.search(r"vimeo\.com/(?:video/)?(\d+)", e)
if m:
return f"vimeo:{m.group(1)}", "vimeo"
m = (re.search(r"youtube\.com/embed/([\w-]+)", e)
or re.search(r"youtu\.be/([\w-]+)", e)
or re.search(r"[?&]v=([\w-]+)", e))
if m:
return f"youtube:{m.group(1)}", "youtube"
return "video:" + hashlib.sha1(e.encode()).hexdigest()[:16], None
return "doc:" + hashlib.sha1(item["url"].encode()).hexdigest()[:16], None
def _pripona(url):
m = re.search(r"\.([a-z0-9]{2,4})(\?|$)", url, re.I)
return m.group(1).lower() if m else None
def extract_course_links(s, course_url):
r = s.get(course_url, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
videos, vseen = [], set()
for f in soup.find_all("iframe"):
src = f.get("src") or f.get("data-src") or ""
if src.startswith("//"):
src = "https:" + src
if VIDEO_RE.search(src) and src not in vseen:
vseen.add(src)
videos.append({"embed": src, "watch": watch_url(src)})
docs, seen = [], set()
for a in soup.find_all("a", href=True):
target = decode_redirect(a["href"]) or urljoin(BASE, a["href"])
if DOC_RE.search(target) or FILE_PATH_RE.search(target):
url = unquote(target)
if url in seen:
continue
seen.add(url)
docs.append({
"label": " ".join(a.get_text(" ", strip=True).split())[:70],
"url": url,
})
return {"videos": videos, "documents": docs, "meta": extract_course_meta(soup)}
# ------------------------------------------------------------- stahování ------
def stahni_dokument(s, url, out_dir: Path, label=""):
out_dir.mkdir(parents=True, exist_ok=True)
r = s.get(url, stream=True, timeout=120)
r.raise_for_status()
# jméno souboru z Content-Disposition, jinak z URL
fname = None
cd = r.headers.get("Content-Disposition", "")
m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)", cd)
if m:
fname = unquote(m.group(1))
if not fname:
fname = os.path.basename(urlparse(url).path) or "soubor"
fname = bezpecny_nazev(fname)
if "." not in fname and label:
fname = bezpecny_nazev(label)
dest = out_dir / fname
if dest.exists() and dest.stat().st_size > 0:
return ("existuje", dest.name)
tmp = dest.with_suffix(dest.suffix + ".part")
with open(tmp, "wb") as fp:
for chunk in r.iter_content(chunk_size=65536):
if chunk:
fp.write(chunk)
tmp.replace(dest)
return ("staženo", dest.name)
def stahni_video(embed, out_dir: Path, referer, fmt="bestvideo*+bestaudio/best",
frags=10):
"""Stáhne video přes yt-dlp; soukromé/nedostupné přeskočí. Vrací (stav, info, fp)."""
if sv is None:
return ("chyba", "modul stahni_video není dostupný", None)
try:
import yt_dlp
from yt_dlp.utils import DownloadError
except ImportError:
return ("chyba", "yt-dlp není nainstalován", None)
out_dir.mkdir(parents=True, exist_ok=True)
ff_dir = sv.priprav_ffmpeg()
opts = {
"outtmpl": str(out_dir / "%(title)s [%(id)s].%(ext)s"),
"format": fmt,
"concurrent_fragment_downloads": frags, # paralelní HLS fragmenty = rychlejší
"merge_output_format": "mp4",
"logger": sv._TichyLogger(),
"progress_hooks": [sv._progress_hook],
"noprogress": True,
"noplaylist": True,
"http_headers": {"Referer": referer, "User-Agent": UA},
}
if ff_dir:
opts["ffmpeg_location"] = ff_dir
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(embed, download=True)
fp = None
rd = (info or {}).get("requested_downloads")
if rd:
fp = rd[0].get("filepath")
return ("staženo", info.get("title", embed) if info else embed, fp)
except DownloadError as e:
duvod = sv.klasifikuj_chybu(str(e))
if duvod:
return ("přeskočeno", duvod, None)
return ("chyba", str(e).split("\n")[0], None)
except Exception as e:
return ("chyba", str(e), None)
def _ingest_course(db, c):
"""Zapíše kurz + jeho materiály do Mongo (idempotentně)."""
meta = c.get("meta") or {}
nazev = c.get("nazev") or c.get("title")
kurz = {
"id": c["id"], "slug": c.get("slug"), "nazev": nazev, "url": c.get("url"),
"profese": [c["profession"]] if c.get("profession") else c.get("profese", []),
"pocet_videi": len(c.get("videos", [])),
"pocet_dokumentu": len(c.get("documents", [])),
}
for k in ("autor", "autor_medailonek_url", "datum_publikace", "revidovano",
"akreditace", "kredity"):
kurz[k] = meta.get(k)
edb.upsert_kurz(db, kurz)
for v in c.get("videos", []):
klic, plat = material_klic("video", v)
edb.upsert_material(db, {
"kurz_id": c["id"], "kurz_nazev": nazev, "druh": "video",
"platforma": plat, "klic": klic, "zdroj_url": v["embed"],
"watch_url": v.get("watch"), "popis": None, "pripona": "mp4",
})
for d in c.get("documents", []):
klic, _ = material_klic("dokument", d)
edb.upsert_material(db, {
"kurz_id": c["id"], "kurz_nazev": nazev, "druh": "dokument",
"platforma": None, "klic": klic, "zdroj_url": d["url"],
"watch_url": None, "popis": d.get("label"), "pripona": _pripona(d["url"]),
})
# ---------------------------------------------------------------- hlavní ------
def main():
p = argparse.ArgumentParser(description="Stáhne obsah kurzů z euni.cz.")
p.add_argument("--professions", default="2",
help="ID profesí oddělené čárkou (2=Lékař,4=Farmaceut,7=NLZP), nebo 'all'")
p.add_argument("--scrape-only", action="store_true", help="jen inventura do JSON")
p.add_argument("--from-json", action="store_true",
help="přeskočí scrape, použije existující euni_kurzy.json")
p.add_argument("--no-videos", action="store_true", help="nestahovat videa")
p.add_argument("--no-docs", action="store_true", help="nestahovat dokumenty")
p.add_argument("--video-format", default="bestvideo*+bestaudio/best",
help="yt-dlp formát videa (např. \"bestvideo[height<=720]+bestaudio/best\")")
p.add_argument("--frags", type=int, default=10,
help="počet paralelně stahovaných HLS fragmentů videa (default 10)")
p.add_argument("--limit", type=int, default=0, help="jen prvních N kurzů (test)")
p.add_argument("--out", default=str(SKRIPT_DIR / "stazeno"), help="výstupní adresář")
p.add_argument("--json", default=str(SKRIPT_DIR / "euni_kurzy.json"),
help="cesta k inventurnímu JSON")
p.add_argument("--no-mongo", action="store_true",
help="nezapisovat do MongoDB (jen JSON / stahování)")
p.add_argument("--no-seaweed", action="store_true",
help="nenahrávat kopie do SeaweedFS")
p.add_argument("--seaweed-backfill", action="store_true",
help="jen dohraje do SeaweedFS stažené soubory, které tam chybí")
a = p.parse_args()
json_path = Path(a.json)
out_root = Path(a.out)
s = make_session()
db = None
if not a.no_mongo:
if edb is None:
print("UPOZORNĚNÍ: modul euni_db nedostupný — pokračuji bez Mongo.")
else:
try:
db = edb.ensure_indexes()
print(f"✓ Mongo EUNI připojeno ({edb.MONGO_URI})")
except Exception as e:
print(f"UPOZORNĚNÍ: Mongo nedostupné ({e}) — pokračuji bez něj.")
use_seaweed = not a.no_seaweed and sw is not None
if use_seaweed:
if sw.ping():
print(f"✓ SeaweedFS filer dostupný ({sw.FILER})")
else:
print(f"UPOZORNĚNÍ: SeaweedFS filer nedostupný ({sw.FILER}) — "
f"pokračuji bez záloh.")
use_seaweed = False
# režim: jen dohrát do SeaweedFS chybějící stažené soubory
if a.seaweed_backfill:
if db is None or not use_seaweed:
sys.exit("Backfill potřebuje Mongo i SeaweedFS.")
chybi = edb.materialy_bez_seaweed(db)
print(f"Backfill do SeaweedFS: {len(chybi)} souborů")
ok = 0
for m in chybi:
dest = SKRIPT_DIR / m["soubor"]
if not dest.exists():
continue
remote = _seaweed_path(dest, out_root)
info = _zaloh_do_seaweed(db, dest, out_root, m["kurz_id"], m["klic"])
if info:
ok += 1
print(f" [SEAWEED] {remote}")
print(f"Hotovo: {ok}/{len(chybi)} nahráno.")
return
if a.from_json:
if not json_path.exists():
sys.exit(f"JSON {json_path} neexistuje — spusť nejdřív bez --from-json.")
results = json.loads(json_path.read_text(encoding="utf-8"))
print(f"✓ Načteno z JSON: {len(results)} kurzů")
login(s) # přihlášení potřeba pro stahování dokumentů
else:
login(s)
if a.professions.lower() == "all":
profs = [2, 4, 5, 6, 7]
else:
profs = [int(x) for x in a.professions.split(",") if x.strip()]
print(f"Sbírám kurzy (profese {profs})…")
courses = get_all_courses(s, profs)
print(f"✓ Nalezeno kurzů: {len(courses)}")
if a.limit:
courses = courses[: a.limit]
print(f" (--limit: zpracuji jen prvních {len(courses)})")
results = []
for i, c in enumerate(courses, 1):
try:
links = extract_course_links(s, c["url"])
except Exception as e:
links = {"videos": [], "documents": [], "error": str(e)}
course = {**c, **links}
results.append(course)
if db is not None and "error" not in links:
try:
_ingest_course(db, course)
except Exception as e:
print(f" [MONGO-CHYBA] {c['id']}: {e}")
print(f"[{i}/{len(courses)}] {c['title']}"
f"{len(links.get('videos', []))} videí, "
f"{len(links.get('documents', []))} dokumentů")
time.sleep(0.35)
json_path.write_text(
json.dumps(results, ensure_ascii=False, indent=2, default=str),
encoding="utf-8")
print(f"✓ Inventura uložena: {json_path}")
# souhrn inventury
n_vid = sum(len(c.get("videos", [])) for c in results)
n_doc = sum(len(c.get("documents", [])) for c in results)
print(f"\nCelkem: {len(results)} kurzů, {n_vid} videí, {n_doc} dokumentů")
if a.scrape_only:
return
# stahování
if a.limit:
results = results[: a.limit]
stat = {"doc_ok": 0, "doc_skip": 0, "doc_err": 0,
"vid_ok": 0, "vid_skip": 0, "vid_err": 0, "sw_ok": 0}
for i, c in enumerate(results, 1):
folder = out_root / bezpecny_nazev(f"{c['id']}-{c.get('slug', '')}", 80)
print(f"\n[{i}/{len(results)}] {c.get('title', c['id'])}")
if not a.no_docs:
for d in c.get("documents", []):
klic = material_klic("dokument", d)[0]
try:
stav, name = stahni_dokument(s, d["url"], folder / "dokumenty",
d.get("label", ""))
dest = folder / "dokumenty" / name
if stav == "staženo":
stat["doc_ok"] += 1
print(f" [DOK] {name}")
else:
stat["doc_skip"] += 1
if db is not None:
sz = dest.stat().st_size if dest.exists() else None
edb.set_status(db, c["id"], klic, edb.STAZENO,
soubor=_relpath(dest), velikost_b=sz)
if use_seaweed and dest.exists():
if _zaloh_do_seaweed(db, dest, out_root, c["id"], klic):
stat["sw_ok"] += 1
except Exception as e:
stat["doc_err"] += 1
print(f" [DOK-CHYBA] {d['url']} ({e})")
if db is not None:
edb.set_status(db, c["id"], klic, edb.CHYBA, chyba=str(e))
if not a.no_videos:
for v in c.get("videos", []):
klic = material_klic("video", v)[0]
stav, info, fp = stahni_video(v["embed"], folder / "videa", c["url"],
fmt=a.video_format, frags=a.frags)
if stav == "staženo":
stat["vid_ok"] += 1
print(f" [VIDEO] {info}")
if db is not None:
sz = (Path(fp).stat().st_size
if fp and Path(fp).exists() else None)
edb.set_status(db, c["id"], klic, edb.STAZENO,
soubor=_relpath(fp) if fp else None,
velikost_b=sz)
if use_seaweed and fp and Path(fp).exists():
if _zaloh_do_seaweed(db, fp, out_root, c["id"], klic):
stat["sw_ok"] += 1
elif stav == "přeskočeno":
stat["vid_skip"] += 1
print(f" [VIDEO-PŘESKOČENO] {info}")
if db is not None:
edb.set_status(db, c["id"], klic, edb.PRESKOCENO, duvod=info)
else:
stat["vid_err"] += 1
print(f" [VIDEO-CHYBA] {info}")
if db is not None:
edb.set_status(db, c["id"], klic, edb.CHYBA, chyba=info)
print("\n=== SOUHRN STAHOVÁNÍ ===")
print(f" dokumenty: {stat['doc_ok']} staženo, {stat['doc_skip']} přeskočeno, "
f"{stat['doc_err']} chyb")
print(f" videa: {stat['vid_ok']} staženo, {stat['vid_skip']} přeskočeno "
f"(soukromá/nedostupná), {stat['vid_err']} chyb")
if not a.no_seaweed:
print(f" SeaweedFS: {stat['sw_ok']} souborů zazálohováno")
print(f" výstup: {out_root}")
if __name__ == "__main__":
main()