231 lines
7.4 KiB
Python
231 lines
7.4 KiB
Python
"""
|
||
Stáhne všechna Killer Sudoku puzzle + solutions z dailykillersudoku.com jako PDF.
|
||
Název souboru: YYYY-MM-DD Puzzle SudokuKiller {n} [difficulty {d} of 10] [average solving time {t}].pdf
|
||
|
||
Spuštění:
|
||
python stahni_killer_sudoku.py # stáhne vše nové od posledního stažení
|
||
python stahni_killer_sudoku.py --all # projde všechna čísla znovu (přeskočí existující)
|
||
"""
|
||
|
||
import re
|
||
import sys
|
||
import time
|
||
import threading
|
||
import argparse
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
sys.stderr.reconfigure(encoding="utf-8")
|
||
|
||
BASE_URL = "https://www.dailykillersudoku.com"
|
||
SAVE_DIR = Path(__file__).parent / "DownloadedPuzzles"
|
||
SAVE_DIR.mkdir(exist_ok=True)
|
||
DELAY = 0.1 # sekundy mezi requesty v rámci jednoho vlákna
|
||
NUM_THREADS = 6 # počet souběžných vláken
|
||
|
||
# Kolik puzzle stáhnout (od nejmenšího chybějícího).
|
||
# 0 = stáhni všechna chybějící až do aktuálního.
|
||
AMOUNT_TO_DOWNLOAD = 0
|
||
|
||
SESSION = requests.Session()
|
||
SESSION.headers.update({"User-Agent": "Mozilla/5.0 (compatible; puzzle-downloader/1.0)"})
|
||
|
||
_print_lock = threading.Lock()
|
||
|
||
|
||
def tname() -> str:
|
||
name = threading.current_thread().name
|
||
if name == "MainThread":
|
||
return "[Hlavní]"
|
||
try:
|
||
return f"[T{int(name.split('_')[-1]) + 1}]"
|
||
except (ValueError, IndexError):
|
||
return f"[{name[:8]}]"
|
||
|
||
|
||
def tprint(*args, **kwargs):
|
||
with _print_lock:
|
||
print(tname(), *args, **kwargs)
|
||
|
||
|
||
def puzzle_exists(n: int) -> bool:
|
||
try:
|
||
resp = SESSION.get(f"{BASE_URL}/search?n={n}", timeout=15)
|
||
return 'section class="puzzle' in resp.text
|
||
except requests.RequestException:
|
||
return False
|
||
|
||
|
||
def get_max_puzzle_number() -> int:
|
||
"""Binárním vyhledáváním zjistí číslo nejnovějšího puzzle."""
|
||
lo, hi = 1, 99999
|
||
while lo < hi:
|
||
mid = (lo + hi + 1) // 2
|
||
if puzzle_exists(mid):
|
||
lo = mid
|
||
else:
|
||
hi = mid - 1
|
||
time.sleep(0.5)
|
||
return lo
|
||
|
||
|
||
def get_puzzle_info(n: int) -> dict | None:
|
||
url = f"{BASE_URL}/search?n={n}"
|
||
try:
|
||
resp = SESSION.get(url, timeout=15)
|
||
except requests.RequestException as e:
|
||
tprint(f" Chyba při načítání info puzzle {n}: {e}")
|
||
return None
|
||
|
||
if resp.status_code != 200:
|
||
tprint(f" Puzzle {n}: info stránka nedostupná (HTTP {resp.status_code})")
|
||
return None
|
||
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
section = soup.select_one("section.puzzle")
|
||
if not section:
|
||
tprint(f" Puzzle {n}: nenalezena sekce section.puzzle")
|
||
return None
|
||
|
||
short_month = section.select_one("span.short-month")
|
||
day = section.select_one("span.day")
|
||
year = section.select_one("span.year")
|
||
if not (short_month and day and year):
|
||
tprint(f" Puzzle {n}: datum nenalezeno (chybí span.short-month / .day / .year)")
|
||
return None
|
||
try:
|
||
date_iso = datetime.strptime(
|
||
f"{short_month.text.strip()} {day.text.strip()} {year.text.strip()}",
|
||
"%b %d %Y",
|
||
).strftime("%Y-%m-%d")
|
||
except ValueError as e:
|
||
tprint(f" Puzzle {n}: chyba parsování data ({e})")
|
||
return None
|
||
|
||
diff_el = section.select_one("span.puzzle-difficulty-value")
|
||
difficulty = diff_el.text.strip() if diff_el else "?"
|
||
|
||
time_el = section.select_one("span.puzzle-timing-value")
|
||
avg_time = time_el.text.strip() if time_el else "?"
|
||
|
||
return {"date": date_iso, "number": n, "difficulty": difficulty, "avg_time": avg_time}
|
||
|
||
|
||
def make_filename(info: dict, solution: bool = False) -> str:
|
||
suffix = " [solution]" if solution else ""
|
||
avg_time = re.sub(r'[\\/:*?"<>|]', '-', info["avg_time"])
|
||
return (
|
||
f"{info['date']} Puzzle SudokuKiller {info['number']} "
|
||
f"[difficulty {info['difficulty']} of 10] "
|
||
f"[average solving time {avg_time}]{suffix}.pdf"
|
||
)
|
||
|
||
|
||
def download_pdf(n: int, info: dict, solution: bool = False) -> bool:
|
||
filename = make_filename(info, solution)
|
||
filepath = SAVE_DIR / filename
|
||
|
||
if filepath.exists():
|
||
tprint(f" Přeskočeno (existuje): {filename}")
|
||
return True
|
||
|
||
suffix = ".solution" if solution else ""
|
||
pdf_url = f"{BASE_URL}/pdfs/{n}{suffix}.pdf"
|
||
|
||
try:
|
||
resp = SESSION.get(pdf_url, timeout=30)
|
||
except requests.RequestException as e:
|
||
tprint(f" Chyba stahování {pdf_url}: {e}")
|
||
return False
|
||
|
||
if resp.status_code != 200:
|
||
tprint(f" PDF nedostupné (HTTP {resp.status_code}): {pdf_url}")
|
||
return False
|
||
|
||
if resp.headers.get("content-type", "").startswith("text/html"):
|
||
tprint(f" PDF vrátilo HTML místo binárního obsahu: {pdf_url}")
|
||
return False
|
||
|
||
filepath.write_bytes(resp.content)
|
||
tprint(f" Uloženo: {filename}")
|
||
return True
|
||
|
||
|
||
def process_puzzle(n: int, idx: int, total: int) -> bool:
|
||
tprint(f"[{idx}/{total}] Puzzle #{n}...")
|
||
info = get_puzzle_info(n)
|
||
time.sleep(DELAY)
|
||
if not info:
|
||
return False
|
||
puzzle_ok = download_pdf(n, info, solution=False)
|
||
time.sleep(DELAY)
|
||
solution_ok = download_pdf(n, info, solution=True)
|
||
return puzzle_ok and solution_ok
|
||
|
||
|
||
def find_already_downloaded() -> set[int]:
|
||
downloaded = set()
|
||
for f in SAVE_DIR.glob("*Puzzle SudokuKiller*.pdf"):
|
||
m = re.search(r'SudokuKiller (\d+)', f.name)
|
||
if m:
|
||
downloaded.add(int(m.group(1)))
|
||
return downloaded
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Stáhne Killer Sudoku PDF")
|
||
parser.add_argument("--all", action="store_true", help="Projde všechna čísla od 1 (přeskočí existující)")
|
||
parser.add_argument("--start", type=int, default=1, help="Začáteční číslo puzzle (výchozí: 1)")
|
||
parser.add_argument("--end", type=int, default=None, help="Koncové číslo puzzle (výchozí: aktuální)")
|
||
args = parser.parse_args()
|
||
|
||
tprint("Zjišťuji aktuální číslo puzzle...")
|
||
max_n = get_max_puzzle_number()
|
||
tprint(f"Aktuální nejvyšší puzzle: #{max_n}")
|
||
|
||
end_n = args.end if args.end else max_n
|
||
start_n = args.start
|
||
|
||
downloaded = find_already_downloaded()
|
||
if args.all:
|
||
to_download = list(range(start_n, end_n + 1))
|
||
tprint(f"Projdu všechna puzzle #{start_n}–#{end_n} (přeskočím existující soubory)")
|
||
else:
|
||
to_download = [n for n in range(start_n, end_n + 1) if n not in downloaded]
|
||
tprint(f"Již staženo: {len(downloaded)} puzzle, zbývá stáhnout: {len(to_download)}")
|
||
|
||
if AMOUNT_TO_DOWNLOAD > 0:
|
||
to_download = to_download[:AMOUNT_TO_DOWNLOAD]
|
||
tprint(f"AMOUNT_TO_DOWNLOAD={AMOUNT_TO_DOWNLOAD} → stáhnu prvních {len(to_download)} chybějících")
|
||
|
||
if not to_download:
|
||
tprint("Vše je již staženo.")
|
||
return
|
||
|
||
total = len(to_download)
|
||
ok_count = 0
|
||
err_count = 0
|
||
|
||
tprint(f"Spouštím {NUM_THREADS} vláken...")
|
||
with ThreadPoolExecutor(max_workers=NUM_THREADS, thread_name_prefix="ThreadPoolExecutor-0") as executor:
|
||
futures = {
|
||
executor.submit(process_puzzle, n, idx, total): n
|
||
for idx, n in enumerate(to_download, 1)
|
||
}
|
||
for future in as_completed(futures):
|
||
if future.result():
|
||
ok_count += 1
|
||
else:
|
||
err_count += 1
|
||
|
||
tprint(f"\nHotovo. Úspěšně: {ok_count}, chyby: {err_count}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|