notebookvb

This commit is contained in:
Vladimir Buzalka
2026-05-06 07:19:20 +02:00
parent 7a4847e1cc
commit 0fe37c2434
2 changed files with 430 additions and 0 deletions
@@ -0,0 +1,151 @@
"""
Naimportuje stažené PDF puzzle z DownloadedPuzzles/ do MySQL tabulky sudoku_killer.
Spuštění:
python import_do_mysql.py # přeskočí již existující (podle puzzle_number)
python import_do_mysql.py --all # reimportuje vše (přepíše existující)
"""
import re
import sys
import argparse
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "Knihovny"))
from mysql_db import connect_mysql
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
SAVE_DIR = Path(__file__).parent / "DownloadedPuzzles"
# 2009-01-01 Puzzle SudokuKiller 1 [difficulty 5 of 10] [average solving time 47 min].pdf
FILENAME_RE = re.compile(
r"^(?P<date>\d{4}-\d{2}-\d{2}) Puzzle (?P<type>SudokuKillerGreaterThan|SudokuKiller) (?P<num>\d+) "
r"\[difficulty (?P<diff>\d+) of (?P<maxdiff>\d+)\] "
r"\[average solving time (?P<time>[^\]]+)\]"
r"(?P<solution> \[solution\])?\.pdf$"
)
def parse_time_to_minutes(time_str):
"""Převede '47 min', '1h 7m', '17h 44m' na celkový počet minut."""
time_str = time_str.strip()
m = re.match(r"^(\d+)h\s+(\d+)m$", time_str)
if m:
return int(m.group(1)) * 60 + int(m.group(2))
m = re.match(r"^(\d+)\s+min$", time_str)
if m:
return int(m.group(1))
return None
def load_puzzle_types(cursor):
cursor.execute("SELECT id, name FROM puzzle_type")
return {row["name"]: row["id"] for row in cursor.fetchall()}
def load_existing_numbers(cursor):
cursor.execute("SELECT puzzle_number FROM sudoku_killer")
return {row["puzzle_number"] for row in cursor.fetchall()}
def parse_files():
"""Vrátí dict: puzzle_number -> {"puzzle": Path, "solution": Path|None, metadata...}"""
puzzles = {}
for f in SAVE_DIR.iterdir():
m = FILENAME_RE.match(f.name)
if not m:
print(f"[SKIP] Nerozpoznaný název: {f.name}", file=sys.stderr)
continue
num = int(m.group("num"))
if num not in puzzles:
puzzles[num] = {
"puzzle_number": num,
"puzzle_date": m.group("date"),
"puzzle_type": m.group("type"),
"difficulty": int(m.group("diff")),
"max_difficulty": int(m.group("maxdiff")),
"avg_minutes": parse_time_to_minutes(m.group("time")),
"file_puzzle": None,
"file_solution": None,
}
if m.group("solution"):
puzzles[num]["file_solution"] = f
else:
puzzles[num]["file_puzzle"] = f
return puzzles
def import_puzzle(cursor, puzzle, type_ids):
if puzzle["file_puzzle"] is None:
print(f"[SKIP] puzzle_number={puzzle['puzzle_number']}: chybí PDF puzzlu")
return False
type_id = type_ids.get(puzzle["puzzle_type"])
if type_id is None:
print(f"[SKIP] Neznámý typ: {puzzle['puzzle_type']}")
return False
pdf_puzzle = puzzle["file_puzzle"].read_bytes()
pdf_solution = puzzle["file_solution"].read_bytes() if puzzle["file_solution"] else None
cursor.execute("""
INSERT INTO sudoku_killer
(puzzle_number, puzzle_type_id, puzzle_date, difficulty, max_difficulty,
avg_solving_time_minutes, file_puzzle, file_solution)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
puzzle_type_id = VALUES(puzzle_type_id),
puzzle_date = VALUES(puzzle_date),
difficulty = VALUES(difficulty),
max_difficulty = VALUES(max_difficulty),
avg_solving_time_minutes = VALUES(avg_solving_time_minutes),
file_puzzle = VALUES(file_puzzle),
file_solution = VALUES(file_solution)
""", (
puzzle["puzzle_number"],
type_id,
puzzle["puzzle_date"],
puzzle["difficulty"],
puzzle["max_difficulty"],
puzzle["avg_minutes"],
pdf_puzzle,
pdf_solution,
))
return True
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--all", action="store_true", help="Reimportuje i existující záznamy")
args = parser.parse_args()
import pymysql.cursors
conn = connect_mysql(database="puzzle", cursorclass=pymysql.cursors.DictCursor)
cursor = conn.cursor()
type_ids = load_puzzle_types(cursor)
existing = load_existing_numbers(cursor) if not args.all else set()
puzzles = parse_files()
total = len(puzzles)
print(f"Nalezeno {total} puzzle v adresáři.")
imported = skipped = errors = 0
for i, (num, puzzle) in enumerate(sorted(puzzles.items()), 1):
if num in existing:
skipped += 1
continue
try:
if import_puzzle(cursor, puzzle, type_ids):
imported += 1
else:
errors += 1
except Exception as e:
print(f"[CHYBA] puzzle_number={num}: {e}", file=sys.stderr)
errors += 1
if i % 500 == 0:
print(f" {i}/{total} zpracováno ({imported} importováno, {skipped} přeskočeno, {errors} chyb)")
cursor.close()
conn.close()
print(f"\nHotovo: {imported} importováno, {skipped} přeskočeno, {errors} chyb.")
if __name__ == "__main__":
main()
@@ -0,0 +1,279 @@
"""
Stáhne / přejmenuje Greater-Than Killer Sudoku puzzle + solutions z dailykillersudoku.com.
Název souboru: YYYY-MM-DD Puzzle SudokuKillerGreaterThan {n} [difficulty {d} of 10] [average solving time {t}].pdf
Logika:
1. Načte všechna čísla GT puzzlů ze search (t=4, d=2..10, všechny stránky)
2. Pro každé číslo:
- existuje SudokuKillerGreaterThan {n} → přeskočit
- existuje SudokuKiller {n} → přejmenovat na SudokuKillerGreaterThan
- jinak → stáhnout z /pdfs/{n}.pdf
Spuštění:
python stahni_greater_than.py
"""
import re
import sys
import time
import threading
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from bs4 import BeautifulSoup
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
BASE_URL = "https://www.dailykillersudoku.com"
SAVE_DIR = Path(__file__).parent / "DownloadedPuzzles"
SAVE_DIR.mkdir(exist_ok=True)
DELAY = 0.1 # sekundy mezi requesty v rámci jednoho vlákna
NUM_THREADS = 6
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "Mozilla/5.0 (compatible; puzzle-downloader/1.0)"})
_print_lock = threading.Lock()
def tname() -> str:
name = threading.current_thread().name
if name == "MainThread":
return "[Hlavní]"
try:
return f"[T{int(name.split('_')[-1]) + 1}]"
except (ValueError, IndexError):
return f"[{name[:8]}]"
def tprint(*args, **kwargs):
with _print_lock:
print(tname(), *args, **kwargs)
# ---------------------------------------------------------------------------
# Získání čísel GT puzzlů ze search
# ---------------------------------------------------------------------------
def get_page_puzzle_ids(d: int, page: int) -> list[int]:
url = f"{BASE_URL}/search?d={d}&t=4&p={page}"
try:
resp = SESSION.get(url, timeout=15)
except requests.RequestException as e:
tprint(f" Chyba načítání search d={d} p={page}: {e}")
return []
ids = re.findall(r'id="board(\d+)"', resp.text)
return [int(i) for i in ids]
def get_max_page(d: int) -> int:
url = f"{BASE_URL}/search?d={d}&t=4&s=0"
try:
resp = SESSION.get(url, timeout=15)
except requests.RequestException:
return 0
pages = re.findall(r'href="/search\?[^"]*p=(\d+)"', resp.text)
return max([int(p) for p in pages], default=1) if pages else 1
def collect_all_gt_numbers() -> list[int]:
"""Projde search (d=2..10, t=4) a vrátí seřazený seznam všech GT čísel."""
all_ids = set()
for d in range(2, 11):
max_p = get_max_page(d)
if max_p == 0:
continue
tprint(f" Difficulty {d}: {max_p} stránek")
for page in range(1, max_p + 1):
ids = get_page_puzzle_ids(d, page)
all_ids.update(ids)
time.sleep(DELAY)
return sorted(all_ids)
# ---------------------------------------------------------------------------
# Čtení existujících souborů
# ---------------------------------------------------------------------------
def find_downloaded_killer() -> dict[int, Path]:
"""Vrátí {číslo: cesta} pro SudokuKiller (ne GreaterThan) soubory (puzzle, ne solution)."""
result = {}
for f in SAVE_DIR.glob("*Puzzle SudokuKiller *.pdf"):
if "[solution]" in f.name or "GreaterThan" in f.name:
continue
m = re.search(r"SudokuKiller (\d+)", f.name)
if m:
result[int(m.group(1))] = f
return result
def find_downloaded_gt() -> set[int]:
"""Vrátí čísla již stažených/přejmenovaných SudokuKillerGreaterThan souborů."""
result = set()
for f in SAVE_DIR.glob("*Puzzle SudokuKillerGreaterThan *.pdf"):
if "[solution]" in f.name:
continue
m = re.search(r"SudokuKillerGreaterThan (\d+)", f.name)
if m:
result.add(int(m.group(1)))
return result
# ---------------------------------------------------------------------------
# Přejmenování / stažení
# ---------------------------------------------------------------------------
def killer_to_gt_filename(path: Path) -> str:
return path.name.replace("SudokuKiller ", "SudokuKillerGreaterThan ")
def rename_pair(n: int, killer_path: Path) -> bool:
"""Přejmenuje puzzle + solution soubory SudokuKiller → SudokuKillerGreaterThan."""
ok = True
for f in [killer_path,
killer_path.with_name(killer_path.stem + " [solution].pdf")]:
if not f.exists():
if "[solution]" in f.name:
continue # solution soubor nemusí existovat
tprint(f" Soubor nenalezen pro přejmenování: {f.name}")
ok = False
continue
new_name = killer_to_gt_filename(f)
new_path = SAVE_DIR / new_name
f.rename(new_path)
tprint(f" Přejmenováno: {f.name}{new_name}")
return ok
def get_puzzle_info(n: int) -> dict | None:
url = f"{BASE_URL}/search?n={n}"
try:
resp = SESSION.get(url, timeout=15)
except requests.RequestException as e:
tprint(f" Chyba info puzzle {n}: {e}")
return None
soup = BeautifulSoup(resp.text, "html.parser")
section = soup.select_one("section.puzzle")
if not section:
return None
short_month = section.select_one("span.short-month")
day = section.select_one("span.day")
year = section.select_one("span.year")
if not (short_month and day and year):
return None
try:
date_iso = datetime.strptime(
f"{short_month.text.strip()} {day.text.strip()} {year.text.strip()}",
"%b %d %Y",
).strftime("%Y-%m-%d")
except ValueError:
return None
diff_el = section.select_one("span.puzzle-difficulty-value")
time_el = section.select_one("span.puzzle-timing-value")
return {
"date": date_iso,
"number": n,
"difficulty": diff_el.text.strip() if diff_el else "?",
"avg_time": time_el.text.strip() if time_el else "?",
}
def make_filename(info: dict, solution: bool = False) -> str:
suffix = " [solution]" if solution else ""
avg_time = re.sub(r'[\\/:*?"<>|]', "-", info["avg_time"])
return (
f"{info['date']} Puzzle SudokuKillerGreaterThan {info['number']} "
f"[difficulty {info['difficulty']} of 10] "
f"[average solving time {avg_time}]{suffix}.pdf"
)
def download_pdf(n: int, info: dict, solution: bool = False) -> bool:
filename = make_filename(info, solution)
filepath = SAVE_DIR / filename
if filepath.exists():
return True
suffix = ".solution" if solution else ""
pdf_url = f"{BASE_URL}/pdfs/{n}{suffix}.pdf"
try:
resp = SESSION.get(pdf_url, timeout=30)
except requests.RequestException as e:
tprint(f" Chyba stahování {pdf_url}: {e}")
return False
if resp.status_code != 200:
tprint(f" PDF nedostupné (HTTP {resp.status_code}): {pdf_url}")
return False
if resp.headers.get("content-type", "").startswith("text/html"):
tprint(f" PDF vrátilo HTML: {pdf_url}")
return False
filepath.write_bytes(resp.content)
tprint(f" Staženo: {filename}")
return True
def process_puzzle(n: int, idx: int, total: int,
killer_map: dict[int, Path]) -> bool:
tprint(f"[{idx}/{total}] Puzzle #{n}")
if n in killer_map:
return rename_pair(n, killer_map[n])
# není jako SudokuKiller → stáhnout
info = get_puzzle_info(n)
time.sleep(DELAY)
if not info:
tprint(f" Puzzle {n}: info stránka nenalezena")
return False
ok1 = download_pdf(n, info, solution=False)
time.sleep(DELAY)
ok2 = download_pdf(n, info, solution=True)
return ok1 and ok2
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
tprint("Sbírám čísla GT puzzlů ze search (d=2..10, t=4)...")
gt_numbers = collect_all_gt_numbers()
tprint(f"Celkem GT puzzlů nalezeno: {len(gt_numbers)}")
already_gt = find_downloaded_gt()
killer_map = find_downloaded_killer()
to_process = [n for n in gt_numbers if n not in already_gt]
tprint(f"Již hotovo (GreaterThan): {len(already_gt)}")
tprint(f"Ke zpracování: {len(to_process)}")
if not to_process:
tprint("Vše již zpracováno.")
return
rename_count = sum(1 for n in to_process if n in killer_map)
download_count = len(to_process) - rename_count
tprint(f" → přejmenovat: {rename_count}, stáhnout: {download_count}")
ok_count = 0
err_count = 0
total = len(to_process)
tprint(f"Spouštím {NUM_THREADS} vláken...")
with ThreadPoolExecutor(max_workers=NUM_THREADS,
thread_name_prefix="ThreadPoolExecutor-0") as executor:
futures = {
executor.submit(process_puzzle, n, idx, total, killer_map): n
for idx, n in enumerate(to_process, 1)
}
for future in as_completed(futures):
if future.result():
ok_count += 1
else:
err_count += 1
tprint(f"\nHotovo. Úspěšně: {ok_count}, chyby: {err_count}")
if __name__ == "__main__":
main()