""" Stáhne všechna Killer Sudoku puzzle + solutions z dailykillersudoku.com jako PDF. Název souboru: YYYY-MM-DD Puzzle SudokuKiller {n} [difficulty {d} of 10] [average solving time {t}].pdf Spuštění: python stahni_killer_sudoku.py # stáhne vše nové od posledního stažení python stahni_killer_sudoku.py --all # projde všechna čísla znovu (přeskočí existující) """ import re import sys import time import threading import argparse from datetime import datetime from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import requests from bs4 import BeautifulSoup sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") BASE_URL = "https://www.dailykillersudoku.com" SAVE_DIR = Path(__file__).parent / "DownloadedPuzzles" SAVE_DIR.mkdir(exist_ok=True) DELAY = 0.1 # sekundy mezi requesty v rámci jednoho vlákna NUM_THREADS = 6 # počet souběžných vláken # Kolik puzzle stáhnout (od nejmenšího chybějícího). # 0 = stáhni všechna chybějící až do aktuálního. AMOUNT_TO_DOWNLOAD = 0 SESSION = requests.Session() SESSION.headers.update({"User-Agent": "Mozilla/5.0 (compatible; puzzle-downloader/1.0)"}) _print_lock = threading.Lock() def tname() -> str: name = threading.current_thread().name if name == "MainThread": return "[Hlavní]" try: return f"[T{int(name.split('_')[-1]) + 1}]" except (ValueError, IndexError): return f"[{name[:8]}]" def tprint(*args, **kwargs): with _print_lock: print(tname(), *args, **kwargs) def puzzle_exists(n: int) -> bool: try: resp = SESSION.get(f"{BASE_URL}/search?n={n}", timeout=15) return 'section class="puzzle' in resp.text except requests.RequestException: return False def get_max_puzzle_number() -> int: """Binárním vyhledáváním zjistí číslo nejnovějšího puzzle.""" lo, hi = 1, 99999 while lo < hi: mid = (lo + hi + 1) // 2 if puzzle_exists(mid): lo = mid else: hi = mid - 1 time.sleep(0.5) return lo def get_puzzle_info(n: int) -> dict | None: url = f"{BASE_URL}/search?n={n}" try: resp = SESSION.get(url, timeout=15) except requests.RequestException as e: tprint(f" Chyba při načítání info puzzle {n}: {e}") return None if resp.status_code != 200: tprint(f" Puzzle {n}: info stránka nedostupná (HTTP {resp.status_code})") return None soup = BeautifulSoup(resp.text, "html.parser") section = soup.select_one("section.puzzle") if not section: tprint(f" Puzzle {n}: nenalezena sekce section.puzzle") return None short_month = section.select_one("span.short-month") day = section.select_one("span.day") year = section.select_one("span.year") if not (short_month and day and year): tprint(f" Puzzle {n}: datum nenalezeno (chybí span.short-month / .day / .year)") return None try: date_iso = datetime.strptime( f"{short_month.text.strip()} {day.text.strip()} {year.text.strip()}", "%b %d %Y", ).strftime("%Y-%m-%d") except ValueError as e: tprint(f" Puzzle {n}: chyba parsování data ({e})") return None diff_el = section.select_one("span.puzzle-difficulty-value") difficulty = diff_el.text.strip() if diff_el else "?" time_el = section.select_one("span.puzzle-timing-value") avg_time = time_el.text.strip() if time_el else "?" return {"date": date_iso, "number": n, "difficulty": difficulty, "avg_time": avg_time} def make_filename(info: dict, solution: bool = False) -> str: suffix = " [solution]" if solution else "" avg_time = re.sub(r'[\\/:*?"<>|]', '-', info["avg_time"]) return ( f"{info['date']} Puzzle SudokuKiller {info['number']} " f"[difficulty {info['difficulty']} of 10] " f"[average solving time {avg_time}]{suffix}.pdf" ) def download_pdf(n: int, info: dict, solution: bool = False) -> bool: filename = make_filename(info, solution) filepath = SAVE_DIR / filename if filepath.exists(): tprint(f" Přeskočeno (existuje): {filename}") return True suffix = ".solution" if solution else "" pdf_url = f"{BASE_URL}/pdfs/{n}{suffix}.pdf" try: resp = SESSION.get(pdf_url, timeout=30) except requests.RequestException as e: tprint(f" Chyba stahování {pdf_url}: {e}") return False if resp.status_code != 200: tprint(f" PDF nedostupné (HTTP {resp.status_code}): {pdf_url}") return False if resp.headers.get("content-type", "").startswith("text/html"): tprint(f" PDF vrátilo HTML místo binárního obsahu: {pdf_url}") return False filepath.write_bytes(resp.content) tprint(f" Uloženo: {filename}") return True def process_puzzle(n: int, idx: int, total: int) -> bool: tprint(f"[{idx}/{total}] Puzzle #{n}...") info = get_puzzle_info(n) time.sleep(DELAY) if not info: return False puzzle_ok = download_pdf(n, info, solution=False) time.sleep(DELAY) solution_ok = download_pdf(n, info, solution=True) return puzzle_ok and solution_ok def find_already_downloaded() -> set[int]: downloaded = set() for f in SAVE_DIR.glob("*Puzzle SudokuKiller*.pdf"): m = re.search(r'SudokuKiller (\d+)', f.name) if m: downloaded.add(int(m.group(1))) return downloaded def main(): parser = argparse.ArgumentParser(description="Stáhne Killer Sudoku PDF") parser.add_argument("--all", action="store_true", help="Projde všechna čísla od 1 (přeskočí existující)") parser.add_argument("--start", type=int, default=1, help="Začáteční číslo puzzle (výchozí: 1)") parser.add_argument("--end", type=int, default=None, help="Koncové číslo puzzle (výchozí: aktuální)") args = parser.parse_args() tprint("Zjišťuji aktuální číslo puzzle...") max_n = get_max_puzzle_number() tprint(f"Aktuální nejvyšší puzzle: #{max_n}") end_n = args.end if args.end else max_n start_n = args.start downloaded = find_already_downloaded() if args.all: to_download = list(range(start_n, end_n + 1)) tprint(f"Projdu všechna puzzle #{start_n}–#{end_n} (přeskočím existující soubory)") else: to_download = [n for n in range(start_n, end_n + 1) if n not in downloaded] tprint(f"Již staženo: {len(downloaded)} puzzle, zbývá stáhnout: {len(to_download)}") if AMOUNT_TO_DOWNLOAD > 0: to_download = to_download[:AMOUNT_TO_DOWNLOAD] tprint(f"AMOUNT_TO_DOWNLOAD={AMOUNT_TO_DOWNLOAD} → stáhnu prvních {len(to_download)} chybějících") if not to_download: tprint("Vše je již staženo.") return total = len(to_download) ok_count = 0 err_count = 0 tprint(f"Spouštím {NUM_THREADS} vláken...") with ThreadPoolExecutor(max_workers=NUM_THREADS, thread_name_prefix="ThreadPoolExecutor-0") as executor: futures = { executor.submit(process_puzzle, n, idx, total): n for idx, n in enumerate(to_download, 1) } for future in as_completed(futures): if future.result(): ok_count += 1 else: err_count += 1 tprint(f"\nHotovo. Úspěšně: {ok_count}, chyby: {err_count}") if __name__ == "__main__": main()