This commit is contained in:
2026-06-17 11:53:54 +02:00
parent 9edfddae95
commit dc07e19179
20 changed files with 2294 additions and 0 deletions
+3
View File
@@ -50,3 +50,6 @@ Import vždy přes `sys.path` na kořen projektu nebo přímou cestou.
|--------|---------|-------|
| `stahni_str8ts.py` | `SběrDatRůzné/DailyStr8ts/` | Stahuje daily Str8ts puzzle jako PDF, odesílá emailem — viz [NOTES.md](SběrDatRůzné/DailyStr8ts/NOTES.md) |
| `10_StahnoutXML.py`, `11_ParseXML.py` | `Recepty/NačteníPředpisuWithClaude/` | Pipeline pro stahování detailů receptů z eRecept SÚKL — viz [NacistPredpis_DOKUMENTACE.md](Recepty/NačteníPředpisuWithClaude/NacistPredpis_DOKUMENTACE.md) |
| `watcher.py` | `Webináře/` | Hlídá nové webináře na praktickylekar.online, přes Telegram potvrdí a přihlásí Buzalkovi — viz [NOTES.md](Webináře/NOTES.md) |
| `stahni_video.py` | `Video/` | Stahuje videa (Vimeo, YouTube…) přes yt-dlp; soukromá/nedostupná sám přeskočí — viz [NOTES.md](Video/NOTES.md) |
| `euni_stahni.py`, `euni_db.py`, `euni_report.py` | `Euni/` | Stahování kurzů z euni.cz (PDF + videa) s trackingem v MongoDB EUNI (idempotentní) — viz [NOTES.md](Euni/NOTES.md) |
+4
View File
@@ -0,0 +1,4 @@
# Přihlašovací údaje k euni.cz — zkopíruj do souboru .env a vyplň.
# (.env je v .gitignore, do gitu se nedostane.)
EUNI_USERNAME=tvoje_prihlasovaci_jmeno
EUNI_PASSWORD=tvoje_heslo
+3
View File
@@ -0,0 +1,3 @@
# stažený obsah a inventura — do gitu nepatří
stazeno/
euni_kurzy.json
+112
View File
@@ -0,0 +1,112 @@
# Euni — stahování a tracking kurzů z euni.cz
Přihlásí se na euni.cz, projde kurzy, vytěží odkazy + metadata a stahuje obsah
(PDF/prezentace a videa Vimeo/YouTube). Vše se trackuje v **MongoDB EUNI**, takže
stahování je idempotentní — skript ví, co už má, a netahá dvakrát.
## Soubory
| Soubor | Popis |
|--------|-------|
| `euni_stahni.py` | hlavní pipeline: login → scrape → ingest do Mongo → stahování → záloha do SeaweedFS |
| `euni_db.py` | připojení a operace nad MongoDB EUNI (kolekce, indexy, upserty) |
| `euni_seaweed.py` | nahrávání/stahování souborů do SeaweedFS (filer HTTP API) |
| `euni_restore.py` | obnova všech souborů ze SeaweedFS na disk (na jakémkoli PC) |
| `euni_report.py` | dashboard: přehled stavu (kolik staženo/čeká/přeskočeno) |
| `.env` | `EUNI_USERNAME`, `EUNI_PASSWORD` (v .gitignore) |
| `euni_kurzy.json` | poslední inventura (záloha; primární zdroj je Mongo) |
| `stazeno/` | stažený obsah, `stazeno/<id>-<slug>/{dokumenty,videa}/` |
## Závislosti
```bat
python -m pip install -U requests beautifulsoup4 python-dotenv yt-dlp static-ffmpeg pymongo
```
Video stahuje sdílený modul `../Video/stahni_video.py` (yt-dlp + static-ffmpeg,
soukromá videa sám přeskočí).
## MongoDB EUNI
Server `mongodb://192.168.1.76:27017` (bez hesla), DB `EUNI`. Lze přepsat env
proměnnou `EUNI_MONGO_URI`.
### Kolekce `kurzy` (1 dokument na kurz)
`_id` = euni ID kurzu. Pole: `slug, nazev, url, profese[], autor,
autor_medailonek_url, datum_publikace, revidovano, akreditace, kredity,
pocet_videi, pocet_dokumentu, first_seen, updated_at`.
### Kolekce `materialy` (1 dokument na soubor)
Unikátní index `{kurz_id, klic}`. Pole: `kurz_id, kurz_nazev, druh
(video|dokument), platforma (vimeo|youtube), klic (vimeo:ID / youtube:ID /
doc:hash), zdroj_url, watch_url, popis, pripona, stav, duvod, soubor,
velikost_b, pokusy, posledni_chyba, first_seen, updated_at, stazeno_at`.
**Stavy:** `ceka``stazeno` / `preskoceno` (soukromé video) / `chyba`.
**SeaweedFS reference** (po nahrání kopie): `seaweed_path` (cesta ve filer =
identifikátor pro vyžádání, např. `euni/5618-.../dokumenty/x.pdf`),
`seaweed_fids` (fid chunků = čísla souborů v SeaweedFS), `seaweed_md5`,
`seaweed_size`, `seaweed_at`.
## SeaweedFS záloha + obnova
Každý stažený soubor se nahraje do **SeaweedFS** (filer na Unraidu,
default `http://192.168.1.50:8888`, přepíše env `EUNI_FILER`). Do Mongo se k
materiálu uloží `seaweed_path` + `seaweed_fids`, takže soubor lze kdykoli vyžádat.
- Strukturu na disku zrcadlí cesta: `euni/<id>-<slug>/<typ>/<soubor>`.
- Filer metadata (mapa cesta→chunky) jsou v Mongo DB `seaweedfs` na 192.168.1.76;
bloby na poli Unraidu. (Setup: `U:\\PythonProject\\Janssen\\SeaweedFS\\`.)
- Pozn.: přímý přístup přes raw fid/volume zvenčí nefunguje (volume se uvnitř
Dockeru jmenuje `seaweed-volume`); proto se čte/zapisuje přes filer.
**Obnova kdekoliv** (stačí síť na Mongo + filer):
```bat
python euni_restore.py # vše → ./obnoveno
python euni_restore.py --out D:\Euni # jiný cíl
python euni_restore.py --kurz 5618 # jen jeden kurz
python euni_restore.py --dry-run # jen výpis
```
**Backfill** (dohrát do SeaweedFS soubory stažené dřív):
```bat
python euni_stahni.py --seaweed-backfill --from-json
```
### Idempotence
- Scrape dělá *upsert*: nový materiál → `ceka`; existující si **drží stav**
(nepřepíše stažené). Lze tedy bez obav scrapovat opakovaně.
- Stahování bere jen `stav: ceka` (a volitelně `chyba` pro retry).
## Použití
```bat
python euni_stahni.py --scrape-only # jen inventura → Mongo + JSON
python euni_stahni.py --no-videos # scrape + stáhne jen dokumenty
python euni_stahni.py # scrape + dokumenty + videa
python euni_stahni.py --from-json --no-videos # přeskočí scrape, stáhne z Mongo/JSON
python euni_stahni.py --professions all # všechny profese (2,4,5,6,7)
python euni_stahni.py --limit 3 # jen prvních 3 kurzy (test)
python euni_stahni.py --no-mongo # bez zápisu do Mongo
python euni_report.py # přehled stavu
python euni_report.py --soukroma # seznam přeskočených videí
```
## Jak to funguje (ověřeno)
- **Login** `/sign/` — formulář se parsuje (kopírují se skrytá Nette pole `_do`).
- **Seznam kurzů** — signál `studyAreaList-nextPage` vrací JSON snippet, stránkuje
se dokud přibývají kurzy (profese: 2=Lékař, 4=Farmaceut, 5/6=studenti, 7=NLZP).
- **Detail kurzu** — server-rendered HTML; videa z `<iframe>` (u Vimea se zachová
`?h=` hash), dokumenty z přímých odkazů i `/redirect/<base64>`.
- Metadata z bloků `lecture-info-label``lecture-info-mark`.
## Úskalí
- **Vimeo** dává oddělené video/audio HLS → nutný ffmpeg (řeší static-ffmpeg).
Domain-restricted videa se stahují s referer `https://www.euni.cz/`.
- **Soukromá videa** (autor je zamkl) nejdou stáhnout — skript je označí
`preskoceno` s důvodem, nepadá.
- Anotace kurzu na stránce není (jen obecný text webu) → neukládá se.
- Diakritika v názvech: v konzoli cp1250 OK; výpis má pojistku proti pádu.
+190
View File
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
euni_db.py — připojení a operace nad MongoDB databází EUNI.
Server: mongodb://192.168.1.76:27017 (bez hesla), databáze "EUNI".
Kolekce:
kurzy — 1 dokument na kurz (metadata + počty)
materialy — 1 dokument na stahovatelný soubor (video/dokument) + stav stahování
Idempotence: materialy mají unikátní index {kurz_id, klic}. Upsert nový soubor
založí jako "ceka"; u existujícího NEPŘEPÍŠE stav stahování (jen popisná pole).
"""
import os
from datetime import datetime, timezone
import pymongo
MONGO_URI = os.environ.get("EUNI_MONGO_URI", "mongodb://192.168.1.76:27017")
DB_NAME = "EUNI"
# stavy materiálu
CEKA = "ceka"
STAZENO = "stazeno"
PRESKOCENO = "preskoceno"
CHYBA = "chyba"
def now():
return datetime.now(timezone.utc)
def get_db():
client = pymongo.MongoClient(MONGO_URI, serverSelectionTimeoutMS=4000)
client.admin.command("ping")
return client[DB_NAME]
def ensure_indexes(db=None):
if db is None:
db = get_db()
db.materialy.create_index([("kurz_id", 1), ("klic", 1)], unique=True,
name="uniq_kurz_klic")
db.materialy.create_index("stav", name="stav")
db.materialy.create_index([("druh", 1), ("stav", 1)], name="druh_stav")
db.kurzy.create_index("profese", name="profese")
return db
# ----------------------------------------------------------------- kurzy ------
def upsert_kurz(db, kurz: dict):
"""Vloží/aktualizuje kurz. Zachová first_seen, profese sjednotí."""
_id = kurz["id"]
sets = {
"slug": kurz.get("slug"),
"nazev": kurz.get("nazev") or kurz.get("title"),
"url": kurz.get("url"),
"autor": kurz.get("autor"),
"autor_medailonek_url": kurz.get("autor_medailonek_url"),
"datum_publikace": kurz.get("datum_publikace"),
"revidovano": kurz.get("revidovano"),
"akreditace": kurz.get("akreditace"),
"kredity": kurz.get("kredity"),
"pocet_videi": kurz.get("pocet_videi"),
"pocet_dokumentu": kurz.get("pocet_dokumentu"),
"updated_at": now(),
}
profese = kurz.get("profese") or []
db.kurzy.update_one(
{"_id": _id},
{
"$set": sets,
"$setOnInsert": {"first_seen": now()},
"$addToSet": {"profese": {"$each": profese}} if profese else {},
} if profese else {
"$set": sets,
"$setOnInsert": {"first_seen": now()},
},
upsert=True,
)
# -------------------------------------------------------------- materialy -----
def upsert_material(db, mat: dict):
"""Idempotentní upsert souboru. Nepřepíše stav existujícího záznamu."""
klic_filter = {"kurz_id": mat["kurz_id"], "klic": mat["klic"]}
popisne = {
"kurz_nazev": mat.get("kurz_nazev"),
"druh": mat.get("druh"),
"platforma": mat.get("platforma"),
"zdroj_url": mat.get("zdroj_url"),
"watch_url": mat.get("watch_url"),
"popis": mat.get("popis"),
"pripona": mat.get("pripona"),
"updated_at": now(),
}
db.materialy.update_one(
klic_filter,
{
"$set": popisne,
"$setOnInsert": {
"stav": CEKA,
"duvod": None,
"soubor": None,
"velikost_b": None,
"pokusy": 0,
"posledni_chyba": None,
"stazeno_at": None,
"first_seen": now(),
},
},
upsert=True,
)
def set_status(db, kurz_id, klic, stav, soubor=None, velikost_b=None,
duvod=None, chyba=None):
"""Nastaví výsledek stahování jednoho materiálu."""
sets = {"stav": stav, "updated_at": now()}
if stav == STAZENO:
sets.update({"soubor": soubor, "velikost_b": velikost_b,
"duvod": None, "posledni_chyba": None, "stazeno_at": now()})
elif stav == PRESKOCENO:
sets.update({"duvod": duvod})
elif stav == CHYBA:
sets.update({"posledni_chyba": chyba})
upd = {"$set": sets}
if stav in (STAZENO, CHYBA):
upd["$inc"] = {"pokusy": 1}
db.materialy.update_one({"kurz_id": kurz_id, "klic": klic}, upd)
def set_seaweed(db, kurz_id, klic, path, fids=None, md5=None, size=None):
"""Uloží referenci na kopii v SeaweedFS (cesta + fid chunků)."""
db.materialy.update_one(
{"kurz_id": kurz_id, "klic": klic},
{"$set": {
"seaweed_path": path,
"seaweed_fids": fids or [],
"seaweed_md5": md5,
"seaweed_size": size,
"seaweed_at": now(),
"updated_at": now(),
}},
)
def materialy_bez_seaweed(db):
"""Stažené materiály, které ještě nemají kopii v SeaweedFS (pro backfill)."""
return list(db.materialy.find({
"stav": STAZENO,
"soubor": {"$ne": None},
"$or": [{"seaweed_path": {"$exists": False}}, {"seaweed_path": None}],
}))
def materialy_v_seaweed(db):
"""Materiály s kopií v SeaweedFS (pro restore)."""
return list(db.materialy.find({"seaweed_path": {"$exists": True, "$ne": None}}))
def cekajici_materialy(db, druh=None, vcetne_chyb=False):
"""Vrátí materiály ke stažení (stav 'ceka', volitelně i 'chyba')."""
stavy = [CEKA] + ([CHYBA] if vcetne_chyb else [])
q = {"stav": {"$in": stavy}}
if druh:
q["druh"] = druh
return list(db.materialy.find(q))
# ----------------------------------------------------------------- stats ------
def stats(db=None):
if db is None:
db = get_db()
out = {"kurzy": db.kurzy.count_documents({})}
pipe = [{"$group": {"_id": {"druh": "$druh", "stav": "$stav"},
"n": {"$sum": 1}}}]
for row in db.materialy.aggregate(pipe):
d = row["_id"]["druh"]
st = row["_id"]["stav"]
out.setdefault(d, {})[st] = row["n"]
return out
if __name__ == "__main__":
import json
db = ensure_indexes()
print("Připojeno k EUNI na", MONGO_URI)
print(json.dumps(stats(db), ensure_ascii=False, indent=2))
+75
View File
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
euni_report.py — přehled stavu stahování z databáze EUNI.
python euni_report.py # souhrnný přehled
python euni_report.py --chyby # vypíše materiály ve stavu chyba
python euni_report.py --soukroma # vypíše přeskočená (soukromá) videa
"""
import argparse
import sys
for _s in (sys.stdout, sys.stderr):
try:
_s.reconfigure(errors="backslashreplace")
except Exception:
pass
import euni_db as edb
CARA = "" * 56
def lidsky(n):
for j, u in [(1e9, "GB"), (1e6, "MB"), (1e3, "kB")]:
if n >= j:
return f"{n/j:.1f} {u}"
return f"{n} B"
def main():
p = argparse.ArgumentParser()
p.add_argument("--chyby", action="store_true", help="vypiš materiály ve stavu chyba")
p.add_argument("--soukroma", action="store_true", help="vypiš přeskočená videa")
a = p.parse_args()
db = edb.get_db()
print(CARA)
print(f" EUNI — přehled ({edb.MONGO_URI})")
print(CARA)
print(f" Kurzů: {db.kurzy.count_documents({})}")
kr = db.kurzy.aggregate([{"$group": {"_id": None, "k": {"$sum": "$kredity"}}}])
kr = next(kr, {}).get("k") or 0
print(f" Kreditů celkem (akreditované kurzy): {kr}")
print(CARA)
for druh in ("video", "dokument"):
print(f" {druh.upper()}:")
pipe = [{"$match": {"druh": druh}},
{"$group": {"_id": "$stav", "n": {"$sum": 1},
"b": {"$sum": {"$ifNull": ["$velikost_b", 0]}}}}]
celkem = 0
for row in sorted(db.materialy.aggregate(pipe), key=lambda r: r["_id"]):
vel = f" ({lidsky(row['b'])})" if row["b"] else ""
print(f" {row['_id']:<11} {row['n']:>5}{vel}")
celkem += row["n"]
print(f" {'celkem':<11} {celkem:>5}")
print(CARA)
if a.chyby:
print(" CHYBY:")
for m in db.materialy.find({"stav": edb.CHYBA}):
print(f" - [{m['druh']}] {m.get('kurz_nazev','')[:40]} | "
f"{m.get('posledni_chyba','')[:60]}")
print(f" {m['zdroj_url']}")
if a.soukroma:
print(" PŘESKOČENÁ VIDEA (soukromá/nedostupná):")
for m in db.materialy.find({"stav": edb.PRESKOCENO}):
print(f" - {m.get('kurz_nazev','')[:45]} | {m.get('duvod','')}")
print(f" {m.get('watch_url') or m['zdroj_url']}")
if __name__ == "__main__":
main()
+89
View File
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""
euni_restore.py — obnoví všechny stažené soubory ze SeaweedFS na disk.
Funguje na libovolném počítači: čte reference (cesty/fid) z MongoDB EUNI a každý
soubor stáhne z filer SeaweedFS zpět do souborového systému se stejnou strukturou
jako stazeno/<id>-<slug>/<typ>/<soubor>.
Potřebuje jen síťový přístup k Mongu (192.168.1.76) a filer (192.168.1.50) a:
python -m pip install pymongo requests
Použití:
python euni_restore.py # obnoví do ./obnoveno
python euni_restore.py --out D:\\Euni # jiný cílový adresář
python euni_restore.py --kurz 5618 # jen jeden kurz
python euni_restore.py --dry-run # jen vypíše, co by stáhl
"""
import argparse
import sys
from pathlib import Path
for _s in (sys.stdout, sys.stderr):
try:
_s.reconfigure(errors="backslashreplace")
except Exception:
pass
import euni_db as edb
import euni_seaweed as sw
def lidsky(n):
n = n or 0
for j, u in [(1e9, "GB"), (1e6, "MB"), (1e3, "kB")]:
if n >= j:
return f"{n/j:.1f} {u}"
return f"{n} B"
def main():
p = argparse.ArgumentParser(description="Obnoví soubory ze SeaweedFS na disk.")
p.add_argument("--out", default="obnoveno", help="cílový adresář (výchozí ./obnoveno)")
p.add_argument("--kurz", help="obnovit jen tento kurz_id")
p.add_argument("--dry-run", action="store_true", help="jen vypsat, nestahovat")
a = p.parse_args()
out = Path(a.out)
db = edb.get_db()
if not sw.ping():
sys.exit(f"SeaweedFS filer nedostupný ({sw.FILER}).")
mats = edb.materialy_v_seaweed(db)
if a.kurz:
mats = [m for m in mats if m.get("kurz_id") == a.kurz]
print(f"Obnovuji {len(mats)} souborů z {sw.FILER} -> {out.resolve()}")
ok = preskoc = chyb = 0
bajtu = 0
for m in mats:
remote = m["seaweed_path"]
# lokální cesta: zrcadlí seaweed cestu bez prefixu 'euni/'
parts = remote.split("/")
rel = Path(*parts[1:]) if parts and parts[0] == sw.PREFIX else Path(*parts)
dest = out / rel
want = m.get("seaweed_size")
if dest.exists() and (want is None or dest.stat().st_size == want):
preskoc += 1
continue
if a.dry_run:
print(f" [BY STÁHL] {rel} ({lidsky(want)})")
ok += 1
continue
try:
n = sw.download(remote, dest)
bajtu += n
ok += 1
print(f" [OK] {rel} ({lidsky(n)})")
except Exception as e:
chyb += 1
print(f" [CHYBA] {rel} ({str(e)[:60]})")
print(f"\nHotovo: {ok} obnoveno, {preskoc} přeskočeno (už je), {chyb} chyb. "
f"Staženo {lidsky(bajtu)}.")
if __name__ == "__main__":
main()
+85
View File
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
euni_seaweed.py — nahrávání/stahování souborů do SeaweedFS přes filer HTTP API.
Filer běží na Unraidu (default http://192.168.1.50:8888). Soubory se ukládají
podle cesty, která zrcadlí lokální strukturu: euni/<id>-<slug>/<typ>/<soubor>.
Filer metadata jdou do Mongo "seaweedfs" (na 192.168.1.76) — viz README v
U:\\PythonProject\\Janssen\\SeaweedFS\\.
Identifikátor pro vyžádání souboru = cesta (filer). Navíc se ukládají fid(y)
jednotlivých chunků (číslo souboru v SeaweedFS).
Přepsání endpointu: env EUNI_FILER.
"""
import os
from urllib.parse import quote
import requests
FILER = os.environ.get("EUNI_FILER", "http://192.168.1.50:8888")
PREFIX = "euni" # kořenová složka v SeaweedFS
def _url(remote_path):
return f"{FILER}/" + quote(remote_path.lstrip("/"), safe="/")
def entry_meta(remote_path, timeout=30):
"""Detailní metadata souboru (vč. chunků s fid), nebo None když neexistuje."""
try:
r = requests.get(_url(remote_path) + "?metadata=true", timeout=timeout)
if r.status_code == 200:
return r.json()
except requests.RequestException:
pass
return None
def exists(remote_path):
return entry_meta(remote_path) is not None
def upload(local_path, remote_path, timeout=900):
"""Nahraje soubor na filer. Vrátí dict: path, fids, size, md5."""
fname = os.path.basename(remote_path)
with open(local_path, "rb") as f:
r = requests.post(_url(remote_path), files={"file": (fname, f)},
timeout=timeout)
r.raise_for_status()
meta = entry_meta(remote_path) or {}
fids = [c.get("file_id") for c in (meta.get("chunks") or []) if c.get("file_id")]
return {
"path": remote_path,
"fids": fids,
"size": meta.get("FileSize"),
"md5": meta.get("Md5"),
}
def download(remote_path, local_path, timeout=900):
"""Stáhne soubor z fileru na lokální cestu. Vrátí velikost v bajtech."""
r = requests.get(_url(remote_path), stream=True, timeout=timeout)
r.raise_for_status()
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
tmp = str(local_path) + ".part"
with open(tmp, "wb") as f:
for chunk in r.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
os.replace(tmp, local_path)
return os.path.getsize(local_path)
def ping():
try:
r = requests.get(f"{FILER}/?limit=1", headers={"Accept": "application/json"},
timeout=5)
return r.status_code == 200
except requests.RequestException:
return False
if __name__ == "__main__":
print("Filer:", FILER, "dostupný:" , ping())
+640
View File
@@ -0,0 +1,640 @@
#!/usr/bin/env python3
"""
euni_stahni.py — přihlásí se na euni.cz, projde kurzy a stáhne, co se stáhnout dá
(dokumenty: PDF/DOCX/PPTX/XLSX/ZIP a videa: Vimeo/YouTube).
Postup:
1) login přes /sign/ (formulář se parsuje, kopírují se i skrytá Nette pole)
2) sběr kurzů přes signál studyAreaList-nextPage (stránkování, dokud přibývají)
3) z každého kurzu se vytáhnou <iframe> videa a odkazy na dokumenty
(vč. /redirect/<base64>)
4) vše se stáhne do stazeno/<id>-<slug>/ (dokumenty/ a videa/)
Soukromá / nedostupná videa se samo přeskočí (nepadá).
Závislosti:
python -m pip install -U requests beautifulsoup4 python-dotenv yt-dlp static-ffmpeg
Údaje: Euni/.env -> EUNI_USERNAME=... EUNI_PASSWORD=...
Příklady:
python euni_stahni.py # vše: scrape + dokumenty + videa (profese Lékař)
python euni_stahni.py --scrape-only # jen inventura do euni_kurzy.json
python euni_stahni.py --from-json # přeskočí scrape, použije euni_kurzy.json
python euni_stahni.py --no-videos # jen dokumenty
python euni_stahni.py --professions 2,4 # více profesí (2=Lékař,4=Farmaceut,7=NLZP)
python euni_stahni.py --limit 3 # jen první 3 kurzy (test)
"""
import argparse
import base64
import hashlib
import json
import os
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import urljoin, unquote, urlparse
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# výpis ať nikdy nespadne na znaku mimo kódování konzole
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(errors="backslashreplace")
except Exception:
pass
SKRIPT_DIR = Path(__file__).resolve().parent
load_dotenv(SKRIPT_DIR / ".env")
# reuse stahovače videí z ../Video/stahni_video.py
sys.path.insert(0, str(SKRIPT_DIR.parent / "Video"))
try:
import stahni_video as sv
except Exception:
sv = None
try:
import euni_db as edb
except Exception:
edb = None
try:
import euni_seaweed as sw
except Exception:
sw = None
BASE = "https://www.euni.cz"
LOGIN_URL = f"{BASE}/sign/?bid=1"
LIST_URL = f"{BASE}/seznam-kurzu?bid=1"
NEXTPAGE = f"{BASE}/seznam-kurzu?studyAreaList-professionId={{prof}}&bid=1&do=studyAreaList-nextPage"
DOC_RE = re.compile(r"\.(pdf|docx?|pptx?|xlsx?|zip)(\?|$)", re.I)
FILE_PATH_RE = re.compile(r"fileUploader/download|files/resources", re.I)
VIDEO_RE = re.compile(r"vimeo|youtube|youtu\.be", re.I)
UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120 Safari/537.36")
# ---------------------------------------------------------------- pomocné -----
def bezpecny_nazev(s: str, max_len: int = 120) -> str:
"""Očistí řetězec na bezpečný název souboru/složky pro Windows."""
s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", s).strip(" .")
s = re.sub(r"\s+", " ", s)
return (s[:max_len].strip() or "bez_nazvu")
def make_session():
s = requests.Session()
s.headers.update({"User-Agent": UA})
return s
def _relpath(p):
"""Cesta k souboru relativně k adresáři Euni (pro uložení do DB)."""
if not p:
return None
try:
return str(Path(p).resolve().relative_to(SKRIPT_DIR))
except Exception:
return str(p)
def _seaweed_path(dest, out_root):
"""Cesta v SeaweedFS zrcadlící lokální strukturu: euni/<id-slug>/<typ>/<soubor>."""
try:
rel = Path(dest).resolve().relative_to(Path(out_root).resolve())
except Exception:
rel = Path(dest).name
return sw.PREFIX + "/" + "/".join(Path(rel).parts)
def _zaloh_do_seaweed(db, dest, out_root, kurz_id, klic):
"""Nahraje soubor do SeaweedFS a uloží referenci (fid) k materiálu do Mongo."""
if sw is None or not dest or not Path(dest).exists():
return None
remote = _seaweed_path(dest, out_root)
try:
meta = sw.entry_meta(remote)
if meta and meta.get("FileSize") == Path(dest).stat().st_size:
# už tam je se stejnou velikostí — jen zaznamenat referenci
info = {"path": remote,
"fids": [c.get("file_id") for c in (meta.get("chunks") or [])
if c.get("file_id")],
"size": meta.get("FileSize"), "md5": meta.get("Md5")}
else:
info = sw.upload(str(dest), remote)
if db is not None:
edb.set_seaweed(db, kurz_id, klic, info["path"],
fids=info.get("fids"), md5=info.get("md5"),
size=info.get("size"))
return info
except Exception as e:
print(f" [SEAWEED-CHYBA] {remote} ({str(e)[:60]})")
return None
# ----------------------------------------------------------------- login ------
def login(s):
r = s.get(LOGIN_URL, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
form = next((f for f in soup.find_all("form")
if f.find("input", {"type": "password"})), None)
if not form:
raise RuntimeError("Přihlašovací formulář nenalezen.")
data, user_field, pass_field = {}, None, None
for inp in form.find_all("input"):
name = inp.get("name")
if not name:
continue
itype = (inp.get("type") or "text").lower()
data[name] = inp.get("value", "") # zachová skrytá pole (_do, _token...)
if itype == "password":
pass_field = name
elif itype in ("text", "email") and user_field is None:
user_field = name
user = os.environ.get("EUNI_USERNAME")
pwd = os.environ.get("EUNI_PASSWORD")
if not user or not pwd:
sys.exit("Chybí EUNI_USERNAME / EUNI_PASSWORD. Vyplň je v Euni/.env "
"(vzor je v .env.example).")
data[user_field] = user
data[pass_field] = pwd
action = urljoin(LOGIN_URL, form.get("action") or LOGIN_URL)
r = s.post(action, data=data, headers={"Referer": LOGIN_URL}, timeout=30)
r.raise_for_status()
if "Odhlásit" not in r.text and "odhlasit" not in r.text.lower():
raise RuntimeError("Přihlášení se nezdařilo zkontroluj údaje v .env.")
print("✓ Přihlášeno")
# ------------------------------------------------------------- seznam kurzů ----
def get_courses_for_profession(s, profession_id):
# inicializace stránkování pro danou profesi
s.get(f"{BASE}/seznam-kurzu?studyAreaList-professionId={profession_id}&bid=1",
timeout=30)
seen, prev, guard = {}, -1, 0
while guard < 200:
guard += 1
r = s.get(NEXTPAGE.format(prof=profession_id),
headers={"X-Requested-With": "XMLHttpRequest"}, timeout=30)
r.raise_for_status()
try:
snippet = r.json().get("snippets", {}).get(
"snippet-studyAreaList-areaList", "")
except ValueError:
break
if not snippet:
break
soup = BeautifulSoup(snippet, "html.parser")
for a in soup.select("a.workshop"):
href = (a.get("href") or "").split("?")[0]
m = re.match(r"/lecture/(\d+)-(.+)", href)
if m:
seen[m.group(1)] = {
"id": m.group(1),
"slug": m.group(2),
"title": (a.find("h3").get_text(strip=True)
if a.find("h3") else m.group(2)),
"url": urljoin(BASE, href),
"profession": profession_id,
}
if len(seen) == prev:
break
prev = len(seen)
time.sleep(0.25)
return list(seen.values())
def get_all_courses(s, professions):
vse = {}
for prof in professions:
kurzy = get_courses_for_profession(s, prof)
print(f" profese {prof}: {len(kurzy)} kurzů")
for k in kurzy:
vse.setdefault(k["id"], k)
return list(vse.values())
# --------------------------------------------------------- extrakce odkazů ----
def decode_redirect(href):
m = re.search(r"/redirect/([A-Za-z0-9+/=]+)", href)
if m:
try:
return base64.b64decode(m.group(1)).decode("utf-8", "ignore")
except Exception:
pass
return None
def watch_url(embed):
m = re.search(r"player\.vimeo\.com/video/(\d+)", embed)
if m:
return f"https://vimeo.com/{m.group(1)}"
m = re.search(r"youtube\.com/embed/([\w-]+)", embed)
if m:
return f"https://www.youtube.com/watch?v={m.group(1)}"
return embed
def _text(el):
return " ".join(el.get_text(" ", strip=True).split()) if el else None
def _parse_date(s):
m = re.search(r"(\d{1,2})\.\s*(\d{1,2})\.\s*(\d{4})", s or "")
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except ValueError:
return None
return None
def _mark_for_label(soup, label_text):
"""Najde hodnotu (lecture-info-mark/bold) ve stejném containeru jako daný label."""
for lab in soup.select(".lecture-info-label"):
if label_text.lower() in lab.get_text(strip=True).lower():
par = lab.parent
mark = (par.select_one(".lecture-info-mark")
or par.select_one(".lecture-info-bold"))
if mark:
return _text(mark)
return None
def extract_course_meta(soup):
meta = {}
autor_el = soup.select_one(".lecture-info-column-author")
if autor_el:
meta["autor"] = _text(autor_el.select_one(".lecture-info-mark"))
href = autor_el.get("href") or ""
if "vimeo" in href or "youtube" in href:
meta["autor_medailonek_url"] = href
if not meta.get("autor"):
meta["autor"] = (_mark_for_label(soup, "Autor kurzu")
or _mark_for_label(soup, "Autorka kurzu"))
meta["datum_publikace"] = _parse_date(_mark_for_label(soup, "Datum publikace"))
meta["revidovano"] = _parse_date(_mark_for_label(soup, "Revidováno"))
meta["akreditace"] = _mark_for_label(soup, "Akreditace")
m = re.search(r"(\d+)\s*kredit", soup.get_text(" "), re.I)
meta["kredity"] = int(m.group(1)) if m else None
return meta
def material_klic(druh, item):
"""Vrátí (klic, platforma) pro deduplikaci materiálu."""
if druh == "video":
e = item["embed"]
m = re.search(r"vimeo\.com/(?:video/)?(\d+)", e)
if m:
return f"vimeo:{m.group(1)}", "vimeo"
m = (re.search(r"youtube\.com/embed/([\w-]+)", e)
or re.search(r"youtu\.be/([\w-]+)", e)
or re.search(r"[?&]v=([\w-]+)", e))
if m:
return f"youtube:{m.group(1)}", "youtube"
return "video:" + hashlib.sha1(e.encode()).hexdigest()[:16], None
return "doc:" + hashlib.sha1(item["url"].encode()).hexdigest()[:16], None
def _pripona(url):
m = re.search(r"\.([a-z0-9]{2,4})(\?|$)", url, re.I)
return m.group(1).lower() if m else None
def extract_course_links(s, course_url):
r = s.get(course_url, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
videos, vseen = [], set()
for f in soup.find_all("iframe"):
src = f.get("src") or f.get("data-src") or ""
if src.startswith("//"):
src = "https:" + src
if VIDEO_RE.search(src) and src not in vseen:
vseen.add(src)
videos.append({"embed": src, "watch": watch_url(src)})
docs, seen = [], set()
for a in soup.find_all("a", href=True):
target = decode_redirect(a["href"]) or urljoin(BASE, a["href"])
if DOC_RE.search(target) or FILE_PATH_RE.search(target):
url = unquote(target)
if url in seen:
continue
seen.add(url)
docs.append({
"label": " ".join(a.get_text(" ", strip=True).split())[:70],
"url": url,
})
return {"videos": videos, "documents": docs, "meta": extract_course_meta(soup)}
# ------------------------------------------------------------- stahování ------
def stahni_dokument(s, url, out_dir: Path, label=""):
out_dir.mkdir(parents=True, exist_ok=True)
r = s.get(url, stream=True, timeout=120)
r.raise_for_status()
# jméno souboru z Content-Disposition, jinak z URL
fname = None
cd = r.headers.get("Content-Disposition", "")
m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)", cd)
if m:
fname = unquote(m.group(1))
if not fname:
fname = os.path.basename(urlparse(url).path) or "soubor"
fname = bezpecny_nazev(fname)
if "." not in fname and label:
fname = bezpecny_nazev(label)
dest = out_dir / fname
if dest.exists() and dest.stat().st_size > 0:
return ("existuje", dest.name)
tmp = dest.with_suffix(dest.suffix + ".part")
with open(tmp, "wb") as fp:
for chunk in r.iter_content(chunk_size=65536):
if chunk:
fp.write(chunk)
tmp.replace(dest)
return ("staženo", dest.name)
def stahni_video(embed, out_dir: Path, referer):
"""Stáhne video přes yt-dlp; soukromé/nedostupné přeskočí. Vrací (stav, info)."""
if sv is None:
return ("chyba", "modul stahni_video není dostupný")
try:
import yt_dlp
from yt_dlp.utils import DownloadError
except ImportError:
return ("chyba", "yt-dlp není nainstalován")
out_dir.mkdir(parents=True, exist_ok=True)
ff_dir = sv.priprav_ffmpeg()
opts = {
"outtmpl": str(out_dir / "%(title)s [%(id)s].%(ext)s"),
"format": "bestvideo*+bestaudio/best",
"merge_output_format": "mp4",
"logger": sv._TichyLogger(),
"progress_hooks": [sv._progress_hook],
"noprogress": True,
"noplaylist": True,
"http_headers": {"Referer": referer, "User-Agent": UA},
}
if ff_dir:
opts["ffmpeg_location"] = ff_dir
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(embed, download=True)
fp = None
rd = (info or {}).get("requested_downloads")
if rd:
fp = rd[0].get("filepath")
return ("staženo", info.get("title", embed) if info else embed, fp)
except DownloadError as e:
duvod = sv.klasifikuj_chybu(str(e))
if duvod:
return ("přeskočeno", duvod, None)
return ("chyba", str(e).split("\n")[0], None)
except Exception as e:
return ("chyba", str(e), None)
def _ingest_course(db, c):
"""Zapíše kurz + jeho materiály do Mongo (idempotentně)."""
meta = c.get("meta") or {}
nazev = c.get("nazev") or c.get("title")
kurz = {
"id": c["id"], "slug": c.get("slug"), "nazev": nazev, "url": c.get("url"),
"profese": [c["profession"]] if c.get("profession") else c.get("profese", []),
"pocet_videi": len(c.get("videos", [])),
"pocet_dokumentu": len(c.get("documents", [])),
}
for k in ("autor", "autor_medailonek_url", "datum_publikace", "revidovano",
"akreditace", "kredity"):
kurz[k] = meta.get(k)
edb.upsert_kurz(db, kurz)
for v in c.get("videos", []):
klic, plat = material_klic("video", v)
edb.upsert_material(db, {
"kurz_id": c["id"], "kurz_nazev": nazev, "druh": "video",
"platforma": plat, "klic": klic, "zdroj_url": v["embed"],
"watch_url": v.get("watch"), "popis": None, "pripona": "mp4",
})
for d in c.get("documents", []):
klic, _ = material_klic("dokument", d)
edb.upsert_material(db, {
"kurz_id": c["id"], "kurz_nazev": nazev, "druh": "dokument",
"platforma": None, "klic": klic, "zdroj_url": d["url"],
"watch_url": None, "popis": d.get("label"), "pripona": _pripona(d["url"]),
})
# ---------------------------------------------------------------- hlavní ------
def main():
p = argparse.ArgumentParser(description="Stáhne obsah kurzů z euni.cz.")
p.add_argument("--professions", default="2",
help="ID profesí oddělené čárkou (2=Lékař,4=Farmaceut,7=NLZP), nebo 'all'")
p.add_argument("--scrape-only", action="store_true", help="jen inventura do JSON")
p.add_argument("--from-json", action="store_true",
help="přeskočí scrape, použije existující euni_kurzy.json")
p.add_argument("--no-videos", action="store_true", help="nestahovat videa")
p.add_argument("--no-docs", action="store_true", help="nestahovat dokumenty")
p.add_argument("--limit", type=int, default=0, help="jen prvních N kurzů (test)")
p.add_argument("--out", default=str(SKRIPT_DIR / "stazeno"), help="výstupní adresář")
p.add_argument("--json", default=str(SKRIPT_DIR / "euni_kurzy.json"),
help="cesta k inventurnímu JSON")
p.add_argument("--no-mongo", action="store_true",
help="nezapisovat do MongoDB (jen JSON / stahování)")
p.add_argument("--no-seaweed", action="store_true",
help="nenahrávat kopie do SeaweedFS")
p.add_argument("--seaweed-backfill", action="store_true",
help="jen dohraje do SeaweedFS stažené soubory, které tam chybí")
a = p.parse_args()
json_path = Path(a.json)
out_root = Path(a.out)
s = make_session()
db = None
if not a.no_mongo:
if edb is None:
print("UPOZORNĚNÍ: modul euni_db nedostupný — pokračuji bez Mongo.")
else:
try:
db = edb.ensure_indexes()
print(f"✓ Mongo EUNI připojeno ({edb.MONGO_URI})")
except Exception as e:
print(f"UPOZORNĚNÍ: Mongo nedostupné ({e}) — pokračuji bez něj.")
use_seaweed = not a.no_seaweed and sw is not None
if use_seaweed:
if sw.ping():
print(f"✓ SeaweedFS filer dostupný ({sw.FILER})")
else:
print(f"UPOZORNĚNÍ: SeaweedFS filer nedostupný ({sw.FILER}) — "
f"pokračuji bez záloh.")
use_seaweed = False
# režim: jen dohrát do SeaweedFS chybějící stažené soubory
if a.seaweed_backfill:
if db is None or not use_seaweed:
sys.exit("Backfill potřebuje Mongo i SeaweedFS.")
chybi = edb.materialy_bez_seaweed(db)
print(f"Backfill do SeaweedFS: {len(chybi)} souborů")
ok = 0
for m in chybi:
dest = SKRIPT_DIR / m["soubor"]
if not dest.exists():
continue
remote = _seaweed_path(dest, out_root)
info = _zaloh_do_seaweed(db, dest, out_root, m["kurz_id"], m["klic"])
if info:
ok += 1
print(f" [SEAWEED] {remote}")
print(f"Hotovo: {ok}/{len(chybi)} nahráno.")
return
if a.from_json:
if not json_path.exists():
sys.exit(f"JSON {json_path} neexistuje — spusť nejdřív bez --from-json.")
results = json.loads(json_path.read_text(encoding="utf-8"))
print(f"✓ Načteno z JSON: {len(results)} kurzů")
login(s) # přihlášení potřeba pro stahování dokumentů
else:
login(s)
if a.professions.lower() == "all":
profs = [2, 4, 5, 6, 7]
else:
profs = [int(x) for x in a.professions.split(",") if x.strip()]
print(f"Sbírám kurzy (profese {profs})…")
courses = get_all_courses(s, profs)
print(f"✓ Nalezeno kurzů: {len(courses)}")
if a.limit:
courses = courses[: a.limit]
print(f" (--limit: zpracuji jen prvních {len(courses)})")
results = []
for i, c in enumerate(courses, 1):
try:
links = extract_course_links(s, c["url"])
except Exception as e:
links = {"videos": [], "documents": [], "error": str(e)}
course = {**c, **links}
results.append(course)
if db is not None and "error" not in links:
try:
_ingest_course(db, course)
except Exception as e:
print(f" [MONGO-CHYBA] {c['id']}: {e}")
print(f"[{i}/{len(courses)}] {c['title']}"
f"{len(links.get('videos', []))} videí, "
f"{len(links.get('documents', []))} dokumentů")
time.sleep(0.35)
json_path.write_text(
json.dumps(results, ensure_ascii=False, indent=2, default=str),
encoding="utf-8")
print(f"✓ Inventura uložena: {json_path}")
# souhrn inventury
n_vid = sum(len(c.get("videos", [])) for c in results)
n_doc = sum(len(c.get("documents", [])) for c in results)
print(f"\nCelkem: {len(results)} kurzů, {n_vid} videí, {n_doc} dokumentů")
if a.scrape_only:
return
# stahování
if a.limit:
results = results[: a.limit]
stat = {"doc_ok": 0, "doc_skip": 0, "doc_err": 0,
"vid_ok": 0, "vid_skip": 0, "vid_err": 0, "sw_ok": 0}
for i, c in enumerate(results, 1):
folder = out_root / bezpecny_nazev(f"{c['id']}-{c.get('slug', '')}", 80)
print(f"\n[{i}/{len(results)}] {c.get('title', c['id'])}")
if not a.no_docs:
for d in c.get("documents", []):
klic = material_klic("dokument", d)[0]
try:
stav, name = stahni_dokument(s, d["url"], folder / "dokumenty",
d.get("label", ""))
dest = folder / "dokumenty" / name
if stav == "staženo":
stat["doc_ok"] += 1
print(f" [DOK] {name}")
else:
stat["doc_skip"] += 1
if db is not None:
sz = dest.stat().st_size if dest.exists() else None
edb.set_status(db, c["id"], klic, edb.STAZENO,
soubor=_relpath(dest), velikost_b=sz)
if use_seaweed and dest.exists():
if _zaloh_do_seaweed(db, dest, out_root, c["id"], klic):
stat["sw_ok"] += 1
except Exception as e:
stat["doc_err"] += 1
print(f" [DOK-CHYBA] {d['url']} ({e})")
if db is not None:
edb.set_status(db, c["id"], klic, edb.CHYBA, chyba=str(e))
if not a.no_videos:
for v in c.get("videos", []):
klic = material_klic("video", v)[0]
stav, info, fp = stahni_video(v["embed"], folder / "videa", c["url"])
if stav == "staženo":
stat["vid_ok"] += 1
print(f" [VIDEO] {info}")
if db is not None:
sz = (Path(fp).stat().st_size
if fp and Path(fp).exists() else None)
edb.set_status(db, c["id"], klic, edb.STAZENO,
soubor=_relpath(fp) if fp else None,
velikost_b=sz)
if use_seaweed and fp and Path(fp).exists():
if _zaloh_do_seaweed(db, fp, out_root, c["id"], klic):
stat["sw_ok"] += 1
elif stav == "přeskočeno":
stat["vid_skip"] += 1
print(f" [VIDEO-PŘESKOČENO] {info}")
if db is not None:
edb.set_status(db, c["id"], klic, edb.PRESKOCENO, duvod=info)
else:
stat["vid_err"] += 1
print(f" [VIDEO-CHYBA] {info}")
if db is not None:
edb.set_status(db, c["id"], klic, edb.CHYBA, chyba=info)
print("\n=== SOUHRN STAHOVÁNÍ ===")
print(f" dokumenty: {stat['doc_ok']} staženo, {stat['doc_skip']} přeskočeno, "
f"{stat['doc_err']} chyb")
print(f" videa: {stat['vid_ok']} staženo, {stat['vid_skip']} přeskočeno "
f"(soukromá/nedostupná), {stat['vid_err']} chyb")
if not a.no_seaweed:
print(f" SeaweedFS: {stat['sw_ok']} souborů zazálohováno")
print(f" výstup: {out_root}")
if __name__ == "__main__":
main()
+47
View File
@@ -0,0 +1,47 @@
# Video — stahování videí
## stahni_video.py
Stahuje videa z Vimea, YouTube a dalších webů přes **yt-dlp**. Nejlepší dostupná
kvalita, sloučení video+audio do `.mp4`. Soukromá / nedostupná videa sám pozná
a přeskočí (nespadne).
### Závislosti (jednorázově)
```bat
python -m pip install -U yt-dlp static-ffmpeg
```
- **yt-dlp** — vlastní downloader.
- **static-ffmpeg** — dodá `ffmpeg.exe` + `ffprobe.exe` (v systému ffmpeg není).
Skript si přes `static_ffmpeg.add_paths()` cestu nastaví sám; binárky se
stáhnou při prvním běhu do `site-packages\static_ffmpeg\bin\`.
### Použití
```bat
python stahni_video.py URL [URL2 ...]
python stahni_video.py # vezme URL z urls.txt (1 na řádek)
python stahni_video.py --cookies-from-browser firefox URL # video za přihlášením
python stahni_video.py -o D:\nekam URL # jiný výstupní adresář
```
Výchozí výstupní adresář je tento (`Video/`). Soubory: `%(title)s [%(id)s].mp4`.
### Jak pozná soukromé/nedostupné video
yt-dlp vyhodí `DownloadError` s textem chyby. Funkce `klasifikuj_chybu()` hledá
v textu známé fráze (`private video`, `video unavailable`, `removed`,
`members-only`, …) a vrátí český popis → video se přeskočí. Jiné chyby (síť,
chybí ffmpeg) se vypíšou jako `[CHYBA]`, ale běh pokračuje na další URL.
Na konci se vypíše souhrn (staženo / přeskočeno / chyby).
### Poznámky / úskalí
- **Soukromé YouTube video opravdu nejde stáhnout**, pokud k němu přihlášený
účet nemá udělený přístup — to je záměr, skript ho jen přeskočí.
- **Diakritika v názvech**: cesty se zkomolí, když se předávají Windows binárce
přes Bug Bash pipe; v běžné konzoli (cp1250) je vše v pořádku.
- **Vimeo** dává oddělené video/audio HLS streamy → ffmpeg je nutný pro sloučení.
- Při prvním běhu může yt-dlp varovat na chybějící JavaScript runtime (deno);
pro běžná veřejná videa to nevadí.
+201
View File
@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
stahni_video.py — stahování videí (Vimeo, YouTube, …) přes yt-dlp.
Co umí:
* Automaticky nastaví cestu k ffmpeg (přes balík static-ffmpeg) — netřeba ho
mít v systému.
* Stáhne nejlepší dostupnou kvalitu a sloučí video+audio do .mp4.
* Pokud je video SOUKROMÉ / nedostupné / odstraněné, sám to pozná, vypíše
srozumitelnou hlášku a přeskočí ho (nespadne, jede dál na další URL).
Použití:
python stahni_video.py URL [URL2 ...]
python stahni_video.py # vezme URL z urls.txt (1 na řádek)
python stahni_video.py --cookies-from-browser firefox URL # video za přihlášením
Instalace závislostí (jednorázově):
python -m pip install -U yt-dlp static-ffmpeg
"""
import argparse
import os
import shutil
import sys
from pathlib import Path
# Pojistka: ať výpis nikdy nespadne na znaku, který kódování konzole nezná
# (zachová kódování konzole, jen neznámý znak escapne místo pádu programu).
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(errors="backslashreplace")
except Exception:
pass
SKRIPT_DIR = Path(__file__).resolve().parent
# --- důvody, proč video NEJDE stáhnout (→ přeskočit, ne padat) ----------------
# klíč hledáme (case-insensitive) v textu chyby od yt-dlp
DUVODY_PRESKOCIT = [
("private video", "video je soukromé"),
("video is private", "video je soukromé"),
("this is a private video", "video je soukromé"),
("video unavailable", "video není dostupné"),
("this video is unavailable", "video není dostupné"),
("video has been removed", "video bylo odstraněno"),
("removed by the uploader", "video odstranil autor"),
("no longer available", "video už není dostupné"),
("members-only", "jen pro členy kanálu"),
("available to members", "jen pro členy kanálu"),
("account associated with this video has been terminated",
"účet autora byl zrušen"),
("has been terminated", "účet autora byl zrušen"),
("blocked it on copyright", "blokováno kvůli autorským právům"),
("not available in your country", "nedostupné ve tvé zemi"),
("not available on this app", "nedostupné pro tohoto klienta"),
("sign in to confirm your age", "věkově omezené (nutné přihlášení)"),
("requires payment", "placené video"),
("this live event will begin", "živý přenos zatím nezačal"),
("premieres in", "video bude teprve uvedeno (premiéra)"),
]
def klasifikuj_chybu(msg: str):
"""Vrátí český popis důvodu k přeskočení, nebo None pokud jde o jinou chybu."""
m = msg.lower()
for klic, popis in DUVODY_PRESKOCIT:
if klic in m:
return popis
return None
class _TichyLogger:
"""Potlačí ukecaný výpis yt-dlp; chyby si hlídáme sami přes výjimky."""
def debug(self, msg):
pass
def info(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
def _progress_hook(d):
if d.get("status") == "downloading":
pct = (d.get("_percent_str") or "").strip()
spd = (d.get("_speed_str") or "").strip()
print(f"\r stahuji {pct} {spd} ", end="", flush=True)
elif d.get("status") == "finished":
print(f"\r staženo, zpracovávám… ")
def priprav_ffmpeg():
"""Zajistí ffmpeg/ffprobe a vrátí adresář s binárkami (nebo None)."""
try:
import static_ffmpeg
static_ffmpeg.add_paths() # přidá ffmpeg/ffprobe do PATH (1. běh = stáhne)
except ImportError:
pass
ff = shutil.which("ffmpeg")
if ff:
return os.path.dirname(ff)
print("UPOZORNĚNÍ: ffmpeg nenalezen — sloučení video+audio nemusí fungovat.")
print(" Nainstaluj: python -m pip install -U static-ffmpeg")
return None
def nacti_urls(args_urls):
if args_urls:
return args_urls
soubor = SKRIPT_DIR / "urls.txt"
if soubor.exists():
radky = [r.strip() for r in soubor.read_text(encoding="utf-8").splitlines()]
return [r for r in radky if r and not r.startswith("#")]
return []
def stahni(urls, out_dir: Path, cookies_browser=None):
try:
import yt_dlp
from yt_dlp.utils import DownloadError
except ImportError:
sys.exit("Chybí yt-dlp. Nainstaluj: python -m pip install -U yt-dlp")
ff_dir = priprav_ffmpeg()
out_dir.mkdir(parents=True, exist_ok=True)
ydl_opts = {
"outtmpl": str(out_dir / "%(title)s [%(id)s].%(ext)s"),
"format": "bestvideo*+bestaudio/best",
"merge_output_format": "mp4",
"logger": _TichyLogger(),
"progress_hooks": [_progress_hook],
"noprogress": True, # vlastní progress řešíme hookem
"noplaylist": True,
}
if ff_dir:
ydl_opts["ffmpeg_location"] = ff_dir
if cookies_browser:
ydl_opts["cookiesfrombrowser"] = (cookies_browser,)
stazeno, preskoceno, chyby = 0, [], []
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
for i, url in enumerate(urls, 1):
print(f"\n[{i}/{len(urls)}] {url}")
try:
info = ydl.extract_info(url, download=True)
nazev = info.get("title", url) if info else url
print(f" [HOTOVO] {nazev}")
stazeno += 1
except DownloadError as e:
duvod = klasifikuj_chybu(str(e))
if duvod:
print(f" [PRESKOCENO] {duvod}")
preskoceno.append((url, duvod))
else:
strucne = str(e).split("\n")[0]
print(f" [CHYBA] {strucne}")
chyby.append((url, strucne))
except Exception as e: # nečekané — taky nezhasnout celý běh
print(f" [CHYBA] {e}")
chyby.append((url, str(e)))
print("\n=== SOUHRN ===")
print(f" staženo: {stazeno}")
print(f" přeskočeno: {len(preskoceno)}")
for url, duvod in preskoceno:
print(f" - {url} ({duvod})")
if chyby:
print(f" chyby: {len(chyby)}")
for url, msg in chyby:
print(f" - {url} ({msg})")
return stazeno, preskoceno, chyby
def main():
p = argparse.ArgumentParser(
description="Stáhne videa přes yt-dlp; soukromá/nedostupná sám přeskočí.")
p.add_argument("urls", nargs="*", help="URL videí (nebo nech prázdné a použij urls.txt)")
p.add_argument("-o", "--out-dir", default=str(SKRIPT_DIR),
help="výstupní adresář (výchozí: tento adresář)")
p.add_argument("--cookies-from-browser", dest="cookies",
help="prohlížeč pro cookies u videí za přihlášením (firefox/chrome/edge…)")
a = p.parse_args()
urls = nacti_urls(a.urls)
if not urls:
sys.exit("Nezadal jsi žádné URL. Předej je jako argumenty nebo do urls.txt.")
stahni(urls, Path(a.out_dir), cookies_browser=a.cookies)
if __name__ == "__main__":
main()
Binary file not shown.
+4
View File
@@ -0,0 +1,4 @@
.env
*.log
__pycache__/
_*.html
+90
View File
@@ -0,0 +1,90 @@
# Webináře — hlídač nových webinářů (praktickylekar.online)
## Účel
Jednou denně (8:00, Plánovač úloh) zkontroluje [praktickylekar.online](https://www.praktickylekar.online/),
zda přibyl nový webinář. Když ano → přes **Telegram** se zeptá, jestli má přihlásit
osoby z `config.json` (Michaela + Vladimír Buzalkovi), po potvrzení je přihlásí a
výsledek pošle zpět na Telegram. Po přihlášení chodí potvrzovací e-mail automaticky z webu.
## Soubory
| Soubor | Popis |
|--------|-------|
| `watcher.py` | hlavní skript |
| `config.json` | URL + údaje přihlašovaných osob |
| `state.json` | vytvoří se sám; pamatuje poslední zpracované `idwebinar` |
| `watcher.log` | log běhů |
## Přepínače v `watcher.py` (nahoře)
- `POSILATINFOPOKAZDEKONTROLE``True` = pošle Telegram zprávu po **každé** ranní
kontrole (i když nic nového; vhodné při zaběhávání). `True` je teď nastaveno.
Až bude vše ověřené → přepnout na `False` (ozve se jen při novém webináři).
- `DRY_RUN``True` = nic se reálně neodešle (registrace se jen simuluje), Telegram
dotaz proběhne. `False` = ostrý režim (reálné přihlášení po potvrzení „ano").
- `ASK_TIMEOUT` — kolik sekund ráno čekat na odpověď ano/ne (default 1800 = 30 min).
## CLI
```
python watcher.py # ostrý denní běh
python watcher.py --test # ignoruje state + VŽDY dry-run (otestuje plumbing)
python watcher.py --reset # smaže state.json
```
## Ověřená struktura webu (k 2026-06-17)
1. **Banner** na hlavní stránce: `<a href="/webinar.php?idwebinar=560">` → z něj se čte ID.
2. **Brána** `POST /check2.php` s `zdravotnicky-pracovnik=on` & `laicka-verejnost=on`
→ nastaví cookie `souhlas=1`. **Bez ní se registrační formulář vůbec nezobrazí.**
3. **Registrace** `POST /registrovat4.php`, pole:
- `email` (povinné)
- `clen` = `1` (člen SVL Ano) / `2` (Ne) → Buzalkovi `1`
- `prukaz` = číslo průkazu SVL (povinné když clen=1)
- `clk` = evidenční číslo ČLK, **přesně 10 znaků** (`pattern=.{10,10}`)
- `titul1, jmeno, prijmeni, pracoviste, mesto` — jen pro nečleny (clen=2)
- `souhlas` = `on` (souhlas se zpracováním OÚ, povinné)
- **skrytá** `webid` (= idwebinar) a `cislo` (= `PL` + DDMMRRRR, dle data webináře)
**čtou se živě z formuláře, nehádají se.**
> Pokud provozovatel změní názvy polí / strukturu, skript loguje, co našel
> (`watcher.log`) — podle toho se selektory upraví.
## Nasazení na tower (PRODUKCE) — Unraid, python-runner
Běží na **toweru** (Unraid, 192.168.1.76) v kontejneru **`python-runner`**,
plánováno přes **User Scripts plugin** na **8:00 denně**.
- Soubory: `/mnt/user/Scripts/Webinare/` → v kontejneru `/scripts/Webinare/`
- Telegram: na serveru **není** `Knihovny/` ani `Medevio/.env`, proto je přibalená
kopie `telegram_notify.py` + lokální `/scripts/Webinare/.env`
(jen `TELEGRAM_BOT_TOKEN` + `TELEGRAM_CHAT_ID`, práva 600).
- Wrapper: `/boot/config/plugins/user.scripts/scripts/WebinarWatcher/script`
(`flock` + `docker exec`, log `/mnt/user/Scripts/logs/webinar_watcher.log`).
- Rozvrh: záznam v `schedule.json` (`custom: 0 8 * * *`) + řádek v
`customSchedule.cron``update_cron``/etc/cron.d/root`.
- `state.json` na serveru seedován na `560` (na ten jste registrovaní).
### Nasazení / správa z Windows — `deploy_tower.py`
Heslo NIKDY v souboru, bere se z env `TOWER_PW`:
```bash
TOWER_PW=... python deploy_tower.py recon # zmapuje server (jen čte)
TOWER_PW=... python deploy_tower.py deploy # nahraje soubory (+ seed state.json)
TOWER_PW=... python deploy_tower.py env # naplní serverový .env z Medevio/.env
TOWER_PW=... python deploy_tower.py smoke # test: telegram .env + detekce (neodesílá)
TOWER_PW=... python deploy_tower.py schedule # založí/aktualizuje rozvrh 8:00
TOWER_PW=... python deploy_tower.py prodrun # ruční spuštění ostrého běhu
```
Po změně `watcher.py`/`config.json` lokálně → `deploy` znovu (idempotentní,
`state.json` ani `.env` nepřepisuje).
### Heartbeat → tichý režim
Server běží s `POSILATINFOPOKAZDEKONTROLE=True` (ranní „zkontrolováno"). Až bude
ověřeno, v lokálním `watcher.py` přepnout na `False` a `deploy` znovu.
## Alternativa — Plánovač úloh (Windows), pokud poběží lokálně
```powershell
schtasks /Create /TN "WebinarWatcher" /SC DAILY /ST 08:00 ^
/TR "python \"U:\ordinaceprojekt\Webináře\watcher.py\"" /F
```
## Notifikace
Přes sdílenou knihovnu `Knihovny/telegram_notify.py`
(`posli_telegram`, `zeptej_se_telegram`), bot **@Vlado_Claude_Bot**,
token/chat_id z `Medevio/.env`.
+28
View File
@@ -0,0 +1,28 @@
{
"watch_url": "https://www.praktickylekar.online/",
"base_url": "https://www.praktickylekar.online",
"registrants": [
{
"jmeno": "Michaela",
"prijmeni": "Buzalková",
"titul1": "",
"email": "michaela.buzalkova@buzalka.cz",
"clen": "1",
"prukaz": "761790",
"clk": "5141811171",
"pracoviste": "",
"mesto": ""
},
{
"jmeno": "Vladimír",
"prijmeni": "Buzalka",
"titul1": "",
"email": "vladimir.buzalka@buzalka.cz",
"clen": "1",
"prukaz": "761791",
"clk": "1143687173",
"pracoviste": "",
"mesto": ""
}
]
}
+280
View File
@@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""
deploy_tower.py — nasazení webinar-watcheru na tower (Unraid, python-runner).
Heslo se NIKDY neukládá do souboru — bere se z proměnné prostředí TOWER_PW:
TOWER_PW=... python deploy_tower.py recon
TOWER_PW=... python deploy_tower.py deploy
TOWER_PW=... python deploy_tower.py schedule
TOWER_PW=... python deploy_tower.py smoke # rychlý test (neblokuje na Telegramu)
Vzor převzat z EmailAgent / MedicusFirebird:
- skripty v /mnt/user/Scripts/<Název>/ → v kontejneru /scripts/<Název>/
- spouští se: docker exec python-runner python3 /scripts/Webinare/watcher.py
- plánování přes Unraid User Scripts plugin (wrapper + schedule.json cron)
"""
import os
import sys
import json
import posixpath
import paramiko
for _s in (sys.stdout, sys.stderr):
try:
_s.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
HOST = "192.168.1.76"
USER = "root"
CONTAINER = "python-runner"
LOCAL_DIR = os.path.dirname(os.path.abspath(__file__))
HOST_DIR = "/mnt/user/Scripts/Webinare" # na hostiteli (Unraid)
CONT_DIR = "/scripts/Webinare" # uvnitř kontejneru
PLUGIN_DIR = "/boot/config/plugins/user.scripts"
USERSCRIPTS = PLUGIN_DIR + "/scripts"
SCHEDULE_JSON = PLUGIN_DIR + "/schedule.json"
CUSTOM_CRON = PLUGIN_DIR + "/customSchedule.cron"
US_NAME = "WebinarWatcher"
CRON_EXPR = "0 8 * * *"
# soubory, které kopírujeme na server (telegram_notify.py = přibalená kopie,
# protože /scripts/Knihovny na serveru není)
FILES = ["watcher.py", "telegram_notify.py", "config.json", "requirements.txt", "NOTES.md"]
def connect():
pw = os.environ.get("TOWER_PW")
if not pw:
sys.exit("Chybí TOWER_PW v prostředí.")
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(HOST, username=USER, password=pw, timeout=20, allow_agent=False, look_for_keys=False)
return c
def run(c, cmd, timeout=180):
_in, out, err = c.exec_command(cmd, timeout=timeout)
o = out.read().decode("utf-8", "replace")
e = err.read().decode("utf-8", "replace")
rc = out.channel.recv_exit_status()
return rc, o, e
def show(c, cmd, timeout=180):
rc, o, e = run(c, cmd, timeout)
print(f"$ {cmd}")
body = (o + (("\n[stderr] " + e) if e.strip() else "")).rstrip()
print(body if body else "(prázdné)")
print(f" rc={rc}\n")
return rc, o, e
# ── RECON ────────────────────────────────────────────────────────────────────
def recon(c):
show(c, "hostname; uname -r")
show(c, "docker ps --format '{{.Names}}' | sort")
show(c, "ls -la /mnt/user/Scripts/ | head -50")
show(c, f"docker exec {CONTAINER} ls /scripts/ | head -50")
show(c, f"docker exec {CONTAINER} ls -la /scripts/Knihovny/telegram_notify.py")
show(c, f"docker exec {CONTAINER} sh -lc 'test -f /scripts/Medevio/.env && grep -oE \"^(TELEGRAM_BOT_TOKEN|TELEGRAM_CHAT_ID)=\" /scripts/Medevio/.env || echo NENI_ENV'")
show(c, f"docker exec {CONTAINER} python3 -c \"import requests,bs4;print('deps_ok requests',requests.__version__,'bs4',bs4.__version__)\"")
show(c, f"ls -la {USERSCRIPTS}/ | head -50")
# vzor existujícího wrapperu + rozvrhu (StahovaniFaktur)
show(c, f"cat {USERSCRIPTS}/StahovaniFaktur/script 2>/dev/null")
show(c, f"cat {USERSCRIPTS}/StahovaniFaktur/schedule.json 2>/dev/null")
show(c, "grep -n 'Scripts' /etc/cron.d/root 2>/dev/null | head")
# ── DEPLOY (kopie souborů) ───────────────────────────────────────────────────
def deploy(c):
run(c, f"mkdir -p {HOST_DIR} /mnt/user/Scripts/logs")
sftp = c.open_sftp()
for f in FILES:
lp = os.path.join(LOCAL_DIR, f)
if not os.path.exists(lp):
print(f" přeskakuji (není lokálně): {f}")
continue
rp = posixpath.join(HOST_DIR, f)
sftp.put(lp, rp)
print(f"{f}{rp}")
# seed state.json jen když na serveru ještě není (ať se nepřemazává běhový stav)
rp_state = posixpath.join(HOST_DIR, "state.json")
rc, _o, _e = run(c, f"test -f {rp_state}")
if rc != 0:
lp_state = os.path.join(LOCAL_DIR, "state.json")
if os.path.exists(lp_state):
sftp.put(lp_state, rp_state)
print(f" ↑ state.json (seed) → {rp_state}")
else:
with sftp.open(rp_state, "w") as fh:
fh.write('{"last_id": null}\n')
print(" ↑ state.json (prázdný)")
else:
print(" state.json na serveru už existuje — neměním.")
sftp.close()
show(c, f"ls -la {HOST_DIR}/")
# ── ENV (naplní /scripts/Webinare/.env Telegram klíči z lokálního Medevio/.env) ─
def env(c):
src = os.path.join(os.path.dirname(LOCAL_DIR), "Medevio", ".env")
if not os.path.exists(src):
sys.exit("Lokální Medevio/.env nenalezen.")
chteji = ("TELEGRAM_BOT_TOKEN", "TELEGRAM_CHAT_ID")
radky = []
with open(src, encoding="utf-8") as fh:
for line in fh:
s = line.strip()
if "=" in s and not s.startswith("#"):
k = s.split("=", 1)[0].strip()
if k in chteji:
radky.append(s)
keys = [r.split("=", 1)[0] for r in radky]
if not all(k in keys for k in chteji):
sys.exit(f"V Medevio/.env chybí některý z klíčů: {chteji}")
run(c, f"mkdir -p {HOST_DIR}")
sftp = c.open_sftp()
with sftp.open(posixpath.join(HOST_DIR, ".env"), "w") as fh:
fh.write("\n".join(radky) + "\n")
sftp.chmod(posixpath.join(HOST_DIR, ".env"), 0o600)
sftp.close()
print(f" .env zapsán na server ({', '.join(keys)}) — hodnoty se nevypisují.")
show(c, f"docker exec {CONTAINER} sh -lc 'grep -oE \"^(TELEGRAM_BOT_TOKEN|TELEGRAM_CHAT_ID)=\" {CONT_DIR}/.env'")
# ── CRON RECON (zjistí, jak User Scripts ukládá rozvrh) ──────────────────────
def cron(c):
show(c, "ls -la /boot/config/plugins/user.scripts/scripts/StahovaniFaktur/")
show(c, "ls -la /boot/config/plugins/user.scripts/scripts/MedicusFirebirdRestore/")
show(c, "cat /boot/config/plugins/user.scripts/scripts/MedicusFirebirdRestore/schedule.json 2>/dev/null || echo bez_schedule_json")
show(c, "ls -la /etc/cron.d/")
show(c, "cat /etc/cron.d/root 2>/dev/null")
show(c, "crontab -l 2>/dev/null | tail -40")
# ── CRONSTORE RECON (kam plugin persistuje rozvrh přes reboot) ───────────────
def cronstore(c):
show(c, "ls -la /boot/config/plugins/user.scripts/")
show(c, "find /boot/config/plugins/user.scripts/ -maxdepth 1 -type f -exec ls -la {} +")
show(c, "grep -rsl 'StahovaniFaktur' /boot/config/ 2>/dev/null | grep -v '/scripts/StahovaniFaktur/'")
show(c, "grep -rsn '6,18\\|cron\\|schedule' /boot/config/plugins/user.scripts/ --include='*.json' --include='*.cfg' --include='*.dat' --include='*.php' 2>/dev/null | head -40")
# ── CRONFILES (dump přesného formátu schedule.json + customSchedule.cron) ────
def cronfiles(c):
show(c, "sed -n '185,210p' /boot/config/plugins/user.scripts/schedule.json")
show(c, "head -8 /boot/config/plugins/user.scripts/schedule.json")
show(c, "tail -8 /boot/config/plugins/user.scripts/schedule.json")
show(c, "cat /boot/config/plugins/user.scripts/customSchedule.cron")
show(c, "ls -la /usr/local/sbin/update_cron /usr/local/emhttp/plugins/user.scripts/startCustom.php 2>&1")
# ── SMOKE TEST (neblokuje na Telegramu) ──────────────────────────────────────
def smoke(c):
# ověří přibalený telegram modul + načtení .env (jen délky, ne hodnoty)
# + detekci webináře na webu. NEodesílá Telegram ani registraci.
py = (
"import sys; sys.path.insert(0,'/scripts/Webinare');"
"import telegram_notify as t;"
"print('telegram .env OK: token_len',len(t._token()),'chat_id_set',bool(t._resolve_chat_id(None)));"
"import json,requests,re;"
"from bs4 import BeautifulSoup;"
"cfg=json.load(open('/scripts/Webinare/config.json',encoding='utf-8'));"
"s=requests.Session(); s.get(cfg['watch_url'],headers={'User-Agent':'Mozilla/5.0'},timeout=30);"
"r=s.get(cfg['watch_url'],headers={'User-Agent':'Mozilla/5.0'},timeout=30);"
"a=BeautifulSoup(r.text,'html.parser').select('a[href*=\\\"webinar.php?idwebinar=\\\"]')[0];"
"print('detekce OK webinar=',re.search(r'idwebinar=(\\\\d+)',a['href']).group(1))"
)
show(c, f"docker exec {CONTAINER} python3 -c \"{py}\"", timeout=90)
# ── SCHEDULE (User Scripts plugin, denně 8:00) ───────────────────────────────
def schedule(c):
d = f"{USERSCRIPTS}/{US_NAME}"
script_path = f"{d}/script"
cron_line = (f"{CRON_EXPR} /usr/local/emhttp/plugins/user.scripts/startCustom.php "
f"{script_path} > /dev/null 2>&1")
# wrapper (styl převzat z StahovaniFaktur: flock + docker exec + log s datem/rc)
wrapper = (
"#!/bin/bash\n"
"# WebinarWatcher - denne 8:00, hlidac webinaru praktickylekar.online. flock proti prekryvu.\n"
"LOG=/mnt/user/Scripts/logs/webinar_watcher.log\n"
"mkdir -p /mnt/user/Scripts/logs\n"
"exec 9>/tmp/webinar_watcher.lock\n"
"flock -n 9 || exit 0\n"
"OUT=$(docker exec -e PYTHONIOENCODING=utf-8 -e TZ=Europe/Prague " + CONTAINER + " python3 " + CONT_DIR + "/watcher.py 2>&1)\n"
"RC=$?\n"
"{ echo \"===== $(date '+%F %T') (rc=$RC) =====\"; echo \"$OUT\"; } >> \"$LOG\"\n"
)
run(c, f"mkdir -p {d}")
sftp = c.open_sftp()
with sftp.open(script_path, "w") as fh:
fh.write(wrapper)
with sftp.open(f"{d}/name", "w") as fh:
fh.write(US_NAME)
with sftp.open(f"{d}/description", "w") as fh:
fh.write("Hlidac webinaru praktickylekar.online, denne 8:00")
# ── schedule.json: přidej/aktualizuj záznam (se zálohou) ──
run(c, f"cp -a {SCHEDULE_JSON} {SCHEDULE_JSON}.bak_webinar")
with sftp.open(SCHEDULE_JSON, "r") as fh:
data = json.loads(fh.read().decode("utf-8"))
data[script_path] = {
"script": script_path,
"frequency": "custom",
"id": "schedule" + US_NAME,
"custom": CRON_EXPR,
}
with sftp.open(SCHEDULE_JSON, "w") as fh:
fh.write(json.dumps(data, indent=2))
# ── customSchedule.cron: přidej řádek (se zálohou), pokud chybí ──
with sftp.open(CUSTOM_CRON, "r") as fh:
cron_txt = fh.read().decode("utf-8")
if script_path not in cron_txt:
run(c, f"cp -a {CUSTOM_CRON} {CUSTOM_CRON}.bak_webinar")
with sftp.open(CUSTOM_CRON, "w") as fh:
fh.write(cron_txt.rstrip() + "\n\n" + cron_line + "\n")
sftp.close()
run(c, f"chmod +x {script_path}")
# ── regeneruj systémový cron + ověř ──
show(c, "/usr/local/sbin/update_cron")
print("── OVĚŘENÍ ──")
show(c, f"ls -la {d}/")
show(c, f"grep -n '{US_NAME}' {CUSTOM_CRON}")
show(c, f"grep -n '{US_NAME}' /etc/cron.d/root")
show(c, f"grep -n '{US_NAME}' {SCHEDULE_JSON}")
# ── PRODRUN (spustí přesně to, co pustí cron — pro ruční test/trigger) ────────
def prodrun(c):
show(c, f"docker exec -e PYTHONIOENCODING=utf-8 {CONTAINER} python3 {CONT_DIR}/watcher.py",
timeout=200)
MODES = {"recon": recon, "deploy": deploy, "env": env, "cron": cron,
"cronstore": cronstore, "cronfiles": cronfiles, "smoke": smoke,
"schedule": schedule, "prodrun": prodrun}
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "recon"
if mode not in MODES:
sys.exit(f"Neznámý režim '{mode}'. Použij: {', '.join(MODES)}")
c = connect()
try:
print(f"=== {mode.upper()} na {USER}@{HOST} ===\n")
MODES[mode](c)
finally:
c.close()
if __name__ == "__main__":
main()
+2
View File
@@ -0,0 +1,2 @@
requests
beautifulsoup4
+3
View File
@@ -0,0 +1,3 @@
{
"last_id": "560"
}
+115
View File
@@ -0,0 +1,115 @@
"""
telegram_notify.py — PŘIBALENÁ kopie pro běh na serveru (python-runner)
=======================================================================
Na toweru není balík `Knihovny/` ani `Medevio/.env`, proto má watcher tuto
soběstačnou kopii. Funkce jsou shodné s `Knihovny/telegram_notify.py`.
Token a chat_id se hledají v `.env` na víc místech (první nalezené vyhrává):
1) `.env` ve stejném adresáři jako tento soubor (server: /scripts/Webinare/.env)
2) `../Medevio/.env` (lokální vývoj)
3) `../../Medevio/.env` (kořen projektu)
TELEGRAM_BOT_TOKEN=123456789:AAE...
TELEGRAM_CHAT_ID=6639316354
"""
import os
import sys
import time
from pathlib import Path
import requests
def _load_env():
here = Path(__file__).resolve().parent
kandidati = [
here / ".env",
here.parent / "Medevio" / ".env",
here.parent.parent / "Medevio" / ".env",
]
for env_path in kandidati:
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
_load_env()
API_BASE = "https://api.telegram.org/bot{token}/{method}"
def _token() -> str:
token = os.environ.get("TELEGRAM_BOT_TOKEN")
if not token:
raise RuntimeError("Chybí TELEGRAM_BOT_TOKEN (.env)")
return token
def _resolve_chat_id(chat_id):
chat_id = chat_id or os.environ.get("TELEGRAM_CHAT_ID")
if not chat_id:
raise RuntimeError("Chybí TELEGRAM_CHAT_ID (zadej argumentem nebo v .env)")
return str(chat_id)
def _call(method, *, http_timeout=15, **params):
url = API_BASE.format(token=_token(), method=method)
r = requests.post(url, json=params, timeout=http_timeout)
data = r.json()
if not data.get("ok"):
raise RuntimeError(f"Telegram {method} selhal [{r.status_code}]: {data}")
return data["result"]
def posli_telegram(text, *, chat_id=None, parse_mode=None, disable_notification=False):
params = {
"chat_id": _resolve_chat_id(chat_id),
"text": text,
"disable_notification": disable_notification,
}
if parse_mode:
params["parse_mode"] = parse_mode
return _call("sendMessage", **params)
def zeptej_se_telegram(otazka, *, chat_id=None, timeout=300, poll_timeout=30, parse_mode=None):
cid = _resolve_chat_id(chat_id)
existujici = _call("getUpdates", http_timeout=15)
offset = (existujici[-1]["update_id"] + 1) if existujici else 0
posli_telegram(otazka, chat_id=cid, parse_mode=parse_mode)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
zbyva = int(deadline - time.monotonic())
if zbyva <= 0:
break
lp = max(1, min(poll_timeout, zbyva))
updates = _call("getUpdates", http_timeout=lp + 10, offset=offset, timeout=lp)
for u in updates:
offset = u["update_id"] + 1
msg = u.get("message") or {}
if str(msg.get("chat", {}).get("id")) != cid:
continue
text = msg.get("text")
if text:
return text
return None
if __name__ == "__main__":
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
args = sys.argv[1:]
if args and args[0] == "--ask":
print(zeptej_se_telegram(" ".join(args[1:]) or "?", timeout=240) or "(bez odpovědi)")
elif args:
posli_telegram(" ".join(args))
print("Odesláno OK")
else:
print('Použití: python telegram_notify.py "text" | --ask "otázka?"')
+323
View File
@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
watcher.py — Hlídač nových webinářů na praktickylekar.online
============================================================
Co dělá při každém spuštění (cíleno na 1× denně v 8:00 přes Plánovač úloh):
1. Stáhne hlavní stránku a najde banner s nadcházejícím webinářem
(odkaz `webinar.php?idwebinar=<ID>`).
2. Porovná ID s posledním zpracovaným (uloženo ve `state.json`).
3. Pokud je webinář NOVÝ:
a) projde "bránu" (potvrzení zdravotnického odborníka, POST /check2.php) —
teprve potom se na stránce webináře objeví registrační formulář,
b) z formuláře ŽIVĚ přečte skrytá pole `webid` a `cislo`
(cislo = PL + DDMMRRRR, mění se podle data — NIKDY se nehádá),
c) přes Telegram se ZEPTÁ, jestli má osoby z config.json přihlásit,
d) po potvrzení ("ano") odešle registraci za každou osobu,
e) výsledek potvrdí přes Telegram.
4. Pokud nový webinář NENÍ a POSILATINFOPOKAZDEKONTROLE=True, pošle ráno
informaci "zkontrolováno, nic nového".
Po přihlášení chodí potvrzovací e-mail automaticky z webu — e-mail tedy
neřešíme, notifikace jdou jen přes Telegram.
CLI:
python watcher.py # ostrý denní běh
python watcher.py --test # test: ignoruje state, VŽDY dry-run (nic neodešle)
python watcher.py --reset # smaže state.json (zapomene poslední webinář)
"""
import json
import logging
import os
import re
import sys
from pathlib import Path
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
# ── Telegram: lokálně sdílená knihovna z kořene, na serveru přibalená kopie ──
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
try:
# lokálně (Windows): kořen projektu má balík Knihovny + Medevio/.env
from Knihovny.telegram_notify import posli_telegram, zeptej_se_telegram # noqa: E402
except ModuleNotFoundError:
# server (python-runner): Knihovny tu není → přibalená kopie + lokální .env
from telegram_notify import posli_telegram, zeptej_se_telegram # noqa: E402
# ════════════════════════════════════════════════════════════════════════════
# PŘEPÍNAČE
# ════════════════════════════════════════════════════════════════════════════
# True = po KAŽDÉ ranní kontrole pošli na Telegram zprávu "zkontrolováno"
# (i když není nic nového) — užitečné při zaběhávání, ať víš, že to jede.
# False = ozvi se jen když je NOVÝ webinář. (Nastav, až bude vše ověřené.)
POSILATINFOPOKAZDEKONTROLE = True
# True = NIC se reálně neodešle (registrace se jen "nasucho" simuluje a vypíše).
# Telegram dotaz/potvrzení proběhne normálně. Pro bezpečné otestování.
# False = ostrý režim — po potvrzení "ano" na Telegramu se reálně přihlásí.
DRY_RUN = False
# Jak dlouho (s) čekat ráno na odpověď ano/ne na Telegramu, než to vzdá.
ASK_TIMEOUT = 1800 # 30 minut
# ════════════════════════════════════════════════════════════════════════════
HERE = Path(__file__).resolve().parent
CONFIG_PATH = HERE / "config.json"
STATE_PATH = HERE / "state.json"
LOG_PATH = HERE / "watcher.log"
HEADERS = {"User-Agent": "Mozilla/5.0 (webinar-watcher; osobni pouziti)"}
TIMEOUT = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_PATH, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("watcher")
# ── pomocné I/O ──────────────────────────────────────────────────────────────
def load_json(path: Path, default=None):
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def save_json(path: Path, data):
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
# ── krok 1: najdi nadcházející webinář na hlavní stránce ─────────────────────
def find_upcoming_webinar(session, watch_url):
"""Vrátí (id, text_banneru, absolutni_url) nebo None."""
r = session.get(watch_url, headers=HEADERS, timeout=TIMEOUT)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
# Zakomentované bannery jsou HTML komentáře → BeautifulSoup je nebere jako <a>.
odkazy = soup.select('a[href*="webinar.php?idwebinar="]')
if not odkazy:
return None
if len(odkazy) > 1:
log.warning("Na stránce je víc odkazů na webinář (%d), beru první.", len(odkazy))
a = odkazy[0]
href = a.get("href", "")
m = re.search(r"idwebinar=(\d+)", href)
if not m:
return None
wid = m.group(1)
text = " ".join(a.get_text().split())
return wid, text, urljoin(watch_url, href)
# ── krok 2: projdi bránu (potvrzení zdravotnického odborníka) ────────────────
def projdi_branu(session, base_url, reg_url):
"""
POST /check2.php se dvěma checkboxy → nastaví cookie souhlas=1, díky které
se na stránce webináře objeví registrační formulář. Vrací True/False.
"""
data = {"zdravotnicky-pracovnik": "on", "laicka-verejnost": "on"}
r = session.post(
urljoin(base_url, "/check2.php"),
data=data,
headers={**HEADERS, "Referer": reg_url},
timeout=TIMEOUT,
)
r.raise_for_status()
ok = session.cookies.get("souhlas") == "1"
log.info("Brána check2.php: %s (cookies=%s)", "OK" if ok else "?", session.cookies.get_dict())
return ok
# ── krok 3: přečti registrační formulář a jeho skrytá pole ───────────────────
def parse_registration_form(session, reg_url):
"""
Načte stránku webináře (už po projití brány) a vrátí
(action_url, hidden_fields_dict). Skrytá pole (webid, cislo) se ČTOU,
nehádají. Hledá konkrétně formulář mířící na 'registrovat'.
"""
r = session.get(reg_url, headers={**HEADERS, "Referer": reg_url}, timeout=TIMEOUT)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
form = None
for f in soup.find_all("form"):
if "registrovat" in (f.get("action") or "").lower():
form = f
break
if form is None:
raise RuntimeError(
"Registrační formulář nenalezen (brána neprošla, nebo se změnila struktura webu)."
)
action = urljoin(reg_url, form.get("action", ""))
hidden = {}
for inp in form.find_all("input", attrs={"type": "hidden"}):
name = inp.get("name")
if name:
hidden[name] = inp.get("value", "")
return action, hidden
# ── krok 4: sestav a odešli registraci ───────────────────────────────────────
def build_payload(person, hidden):
payload = {
"email": person["email"],
"clen": person.get("clen", "1"),
"prukaz": person.get("prukaz", ""),
"clk": person.get("clk", ""),
"titul1": person.get("titul1", ""),
"jmeno": person.get("jmeno", ""),
"prijmeni": person.get("prijmeni", ""),
"pracoviste": person.get("pracoviste", ""),
"mesto": person.get("mesto", ""),
"souhlas": "on", # souhlas se zpracováním osobních údajů (nutné pro odeslání)
}
payload.update(hidden) # webid, cislo, … (živě z formuláře)
return payload
def register_person(session, action_url, reg_url, person, hidden):
"""Vrátí (ok: bool, info: str)."""
payload = build_payload(person, hidden)
cele_jmeno = f"{person['jmeno']} {person['prijmeni']}"
if DRY_RUN:
log.info("DRY_RUN NEodesílám. Payload pro %s: %s", cele_jmeno, payload)
return True, "DRY-RUN (nic neodesláno)"
r = session.post(
action_url,
data=payload,
headers={**HEADERS, "Referer": reg_url},
timeout=TIMEOUT,
)
r.raise_for_status()
txt_low = r.text.lower()
ok = any(k in txt_low for k in ("úspěš", "uspes", "zaregistr", "děkuj", "dekuj"))
# snippet pro případnou ruční kontrolu
snippet = " ".join(BeautifulSoup(r.text, "html.parser").get_text().split())[:200]
return ok, f"HTTP {r.status_code} | {snippet}"
# ── Telegram dotaz ano/ne ────────────────────────────────────────────────────
def je_souhlas(odpoved: str | None) -> bool:
if not odpoved:
return False
return odpoved.strip().lower() in ("ano", "a", "yes", "y", "jo", "ok")
# ── hlavní logika ────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
test_mode = "--test" in args
if "--reset" in args:
if STATE_PATH.exists():
STATE_PATH.unlink()
log.info("state.json smazán.")
return
cfg = load_json(CONFIG_PATH)
if not cfg:
log.error("Chybí config.json"); sys.exit(1)
dry = DRY_RUN or test_mode # --test vždy jen nasucho
globals()["DRY_RUN"] = dry
state = load_json(STATE_PATH, default={"last_id": None})
session = requests.Session()
session.get(cfg["watch_url"], headers=HEADERS, timeout=TIMEOUT) # init PHPSESSID
found = find_upcoming_webinar(session, cfg["watch_url"])
if not found:
log.info("Žádný nadcházející webinář na stránce nenalezen.")
if POSILATINFOPOKAZDEKONTROLE:
posli_telegram("🔎 Webináře: zkontrolováno, žádný nadcházející webinář na stránce.")
return
wid, banner, reg_url = found
banner_clean = banner.replace("\n", " ")
log.info("Nadcházející webinář: id=%s | %s | %s", wid, banner_clean, reg_url)
je_novy = test_mode or state.get("last_id") != wid
if not je_novy:
log.info("Beze změny (id=%s už zpracováno).", wid)
if POSILATINFOPOKAZDEKONTROLE:
posli_telegram(
f"✅ Webináře: zkontrolováno v 8:00, nic nového.\n"
f"Aktuální (už řešený): {banner_clean}"
)
return
# ── NOVÝ webinář ─────────────────────────────────────────────────────────
log.info("NOVÝ webinář! id=%s", wid)
try:
if not projdi_branu(session, cfg["base_url"], reg_url):
log.warning("Bránu se nepodařilo projít zkouším formulář i tak.")
action_url, hidden = parse_registration_form(session, reg_url)
except Exception as e:
log.exception("Chyba při čtení formuláře.")
posli_telegram(f"⚠️ Webináře: nový webinář {banner_clean}, ale NEPODAŘILO se přečíst formulář:\n{e}")
return
log.info("Formulář action=%s, skrytá pole=%s", action_url, hidden)
jmena = ", ".join(f"{p['jmeno']} {p['prijmeni']}" for p in cfg["registrants"])
# ── Telegram: zeptej se na souhlas s přihlášením ─────────────────────────
otazka = (
f"🆕 NOVÝ webinář na praktickylekar.online!\n\n"
f"{banner_clean}\n{reg_url}\n"
f"(webid={hidden.get('webid','?')}, cislo={hidden.get('cislo','?')})\n\n"
f"Mám přihlásit: {jmena}?\n"
f"{'[TEST nic se reálně neodešle] ' if dry else ''}"
f"Odpověz ANO / NE."
)
odpoved = zeptej_se_telegram(otazka, timeout=ASK_TIMEOUT)
if odpoved is None:
log.info("Bez odpovědi (timeout) state NEukládám, zeptám se zítra znovu.")
return
if not je_souhlas(odpoved):
log.info("Odpověď '%s' → NEpřihlašuji.", odpoved)
state["last_id"] = wid # rozhodnuto (ne) → příště se neptat znovu
save_json(STATE_PATH, state)
posli_telegram(f"👌 OK, webinář {banner_clean} nechávám bez přihlášení.")
return
# ── přihlášení ───────────────────────────────────────────────────────────
vysledky = []
for p in cfg["registrants"]:
cele = f"{p['jmeno']} {p['prijmeni']}"
try:
ok, info = register_person(session, action_url, reg_url, p, hidden)
vysledky.append(f"{'' if ok else ''} {cele}: {'OK' if ok else 'NEJISTÉ zkontroluj'}")
log.info("Registrace %s: %s | %s", cele, ok, info)
except Exception as e:
vysledky.append(f"{cele}: CHYBA {e}")
log.exception("Chyba při registraci %s", p["email"])
# state ukládáme až po pokusu o registraci
state["last_id"] = wid
save_json(STATE_PATH, state)
shrnuti = (
f"{'🧪 TEST (nic neodesláno) ' if dry else '📨 '}Přihlášení na webinář:\n"
f"{banner_clean}\n\n" + "\n".join(vysledky) +
("\n\n(Po reálném přihlášení dorazí potvrzovací e-mail z webu.)" if not dry else "")
)
posli_telegram(shrnuti)
log.info("Hotovo (last_id=%s).", wid)
if __name__ == "__main__":
main()