notebookvb

This commit is contained in:
Vladimir Buzalka
2026-05-08 22:06:57 +02:00
parent c9903646f1
commit c4c0d1d435
14 changed files with 1666 additions and 0 deletions
@@ -0,0 +1,254 @@
"""
Stáhne strukturovaná data (cage definice + řešení) z dailykillersudoku.com
a uloží do sdílené tabulky puzzles.
Funguje bez Playwright — data jsou inline v HTML jako JSON, dekóduje se base64 v Pythonu.
"""
import base64
import json
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import requests
from tqdm import tqdm
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "Knihovny"))
from mysql_db import connect_mysql
PUZZLE_TYPE_MAP = {1: "killer_sudoku", 2: "killer_sudoku_gt"}
BASE_URL = "https://www.dailykillersudoku.com/puzzle/{}"
def fetch_puzzle_json(puzzle_number: int) -> dict | None:
url = BASE_URL.format(puzzle_number)
try:
r = requests.get(url, timeout=15)
if r.status_code != 200:
return None
m = re.search(r'new DKS\.Puzzle\((\{.*?\})\)', r.text)
if not m:
return None
return json.loads(m.group(1))
except Exception:
return None
def decode_board(board_b64: str) -> tuple[list[list[int]], list[int]]:
"""Dekóduje board_base64 → (cage_map 9x9, cage_sums)."""
raw = base64.b64decode(board_b64)
# Header: 2 bytes, pak 81 × 2 bytes (uint16 BE cage IDs), pak N bytes (sums)
cell_data = raw[2:2 + 81 * 2]
sum_data = raw[2 + 81 * 2:]
cage_map = []
for r in range(9):
row = []
for c in range(9):
idx = (r * 9 + c) * 2
cage_id = (cell_data[idx] << 8) | cell_data[idx + 1]
row.append(cage_id)
cage_map.append(row)
cage_sums = list(sum_data)
return cage_map, cage_sums
def decode_solution(solution_b64: str) -> list[list[int]]:
"""Dekóduje solution_base64 → 9x9 mřížka."""
raw = base64.b64decode(solution_b64)
values = list(raw[2:]) # skip 2-byte header
return [values[r * 9:(r + 1) * 9] for r in range(9)]
def build_cages_string(cage_map: list[list[int]], cage_sums: list[int]) -> str:
"""Vytvoří cage string ve formátu: sum,r0c0r0c1|sum,r1c2r1c3|..."""
cages = {}
for r in range(9):
for c in range(9):
cid = cage_map[r][c]
if cid not in cages:
cages[cid] = []
cages[cid].append(f"r{r}c{c}")
parts = []
for cid in sorted(cages.keys()):
s = cage_sums[cid] if cid < len(cage_sums) else 0
cells = "".join(cages[cid])
parts.append(f"{s},{cells}")
return "|".join(parts)
def build_solution_string(solution: list[list[int]]) -> str:
return "".join(str(v) for row in solution for v in row)
def process_puzzle(puzzle_number: int) -> dict | None:
pj = fetch_puzzle_json(puzzle_number)
if not pj:
return None
try:
cage_map, cage_sums = decode_board(pj["board_base64"])
solution = decode_solution(pj["solution_base64"])
cage_str = build_cages_string(cage_map, cage_sums)
sol_str = build_solution_string(solution)
game_type = PUZZLE_TYPE_MAP.get(pj.get("puzzle_type", 1), "killer_sudoku")
return {
"puzzle_number": pj["id"],
"game_type": game_type,
"difficulty": str(pj.get("difficulty", 0)),
"puzzle_date": pj.get("date"),
"puzzle": cage_str,
"solution": sol_str,
"extra": json.dumps({
"grid_size": 9,
"puzzle_number": pj["id"],
"original_difficulty": pj.get("difficulty"),
}),
"source": "dailykillersudoku.com",
}
except Exception as e:
return None
def save_batch(results: list[dict]):
conn = connect_mysql(database="puzzle")
cur = conn.cursor()
inserted = 0
for r in results:
cur.execute(
"INSERT INTO puzzles "
"(game_type, difficulty, puzzle_date, puzzle, solution, extra, source) "
"VALUES (%s, %s, %s, %s, %s, %s, %s) "
"ON DUPLICATE KEY UPDATE puzzle=VALUES(puzzle), solution=VALUES(solution), "
"extra=VALUES(extra)",
(r["game_type"], r["difficulty"], r["puzzle_date"],
r["puzzle"], r["solution"], r["extra"], r["source"]),
)
if cur.rowcount > 0:
inserted += 1
cur.close()
conn.close()
return inserted
def get_puzzle_numbers() -> list[int]:
conn = connect_mysql(database="puzzle")
cur = conn.cursor()
cur.execute("SELECT puzzle_number FROM sudoku_killer ORDER BY puzzle_number")
nums = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()
return nums
JSON_FILE = Path(__file__).parent / "killer_structured_data.json"
def download_all(puzzle_numbers: list[int]) -> list[dict]:
"""Stáhne všechna puzzle z webu, průběžně ukládá do JSON souboru."""
all_results = []
if JSON_FILE.exists():
all_results = json.loads(JSON_FILE.read_text(encoding="utf-8"))
print(f"Načteno {len(all_results)} existujících záznamů z JSON")
done_numbers = {r["puzzle_number"] for r in all_results}
remaining = [n for n in puzzle_numbers if n not in done_numbers]
print(f"Zbývá stáhnout: {len(remaining)} z {len(puzzle_numbers)}")
if not remaining:
return all_results
batch_size = 100
errors = 0
with ThreadPoolExecutor(max_workers=6) as executor:
for start in tqdm(range(0, len(remaining), batch_size),
desc="Stahování", unit="batch"):
batch_nums = remaining[start:start + batch_size]
futures = {executor.submit(process_puzzle, n): n for n in batch_nums}
for future in as_completed(futures):
result = future.result()
if result:
all_results.append(result)
else:
errors += 1
JSON_FILE.write_text(
json.dumps(all_results, ensure_ascii=False), encoding="utf-8"
)
print(f"Staženo celkem: {len(all_results)}, chyb: {errors}")
return all_results
def import_from_json():
"""Importuje data z JSON souboru do MySQL."""
if not JSON_FILE.exists():
print("JSON soubor neexistuje, nejdřív spusť stahování.")
return
all_results = json.loads(JSON_FILE.read_text(encoding="utf-8"))
print(f"Importuji {len(all_results)} záznamů z JSON do MySQL...")
batch_size = 500
total_inserted = 0
for start in tqdm(range(0, len(all_results), batch_size),
desc="Import", unit="batch"):
batch = all_results[start:start + batch_size]
inserted = save_batch(batch)
total_inserted += inserted
print(f"Import hotov: aktualizováno {total_inserted} záznamů")
def main():
# Test na jednom puzzle
print("=== Test: puzzle 376 ===")
result = process_puzzle(376)
if result:
print(f" game_type: {result['game_type']}")
print(f" difficulty: {result['difficulty']}")
print(f" date: {result['puzzle_date']}")
print(f" cages ({len(result['puzzle'].split('|'))} klecí): {result['puzzle'][:100]}...")
print(f" solution: {result['solution']}")
else:
print(" Selhalo!")
return
if "--import" in sys.argv:
import_from_json()
return
if "--run" not in sys.argv:
print("\nPro stažení spusť s --run, pro import z JSON s --import")
return
puzzle_numbers = get_puzzle_numbers()
print(f"\nCelkem puzzle k zpracování: {len(puzzle_numbers)}")
all_results = download_all(puzzle_numbers)
print("\nImportuji do MySQL...")
batch_size = 500
total_inserted = 0
for start in tqdm(range(0, len(all_results), batch_size),
desc="Import", unit="batch"):
batch = all_results[start:start + batch_size]
inserted = save_batch(batch)
total_inserted += inserted
print(f"\nHotovo: aktualizováno {total_inserted} záznamů")
if __name__ == "__main__":
main()