Files
ordinaceprojekt/SběrDatRůzné/SudokuKiller/stahni_killer_structured.py
T
Vladimir Buzalka c4c0d1d435 notebookvb
2026-05-08 22:06:57 +02:00

255 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Stáhne strukturovaná data (cage definice + řešení) z dailykillersudoku.com
a uloží do sdílené tabulky puzzles.
Funguje bez Playwright — data jsou inline v HTML jako JSON, dekóduje se base64 v Pythonu.
"""
import base64
import json
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import requests
from tqdm import tqdm
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "Knihovny"))
from mysql_db import connect_mysql
PUZZLE_TYPE_MAP = {1: "killer_sudoku", 2: "killer_sudoku_gt"}
BASE_URL = "https://www.dailykillersudoku.com/puzzle/{}"
def fetch_puzzle_json(puzzle_number: int) -> dict | None:
url = BASE_URL.format(puzzle_number)
try:
r = requests.get(url, timeout=15)
if r.status_code != 200:
return None
m = re.search(r'new DKS\.Puzzle\((\{.*?\})\)', r.text)
if not m:
return None
return json.loads(m.group(1))
except Exception:
return None
def decode_board(board_b64: str) -> tuple[list[list[int]], list[int]]:
"""Dekóduje board_base64 → (cage_map 9x9, cage_sums)."""
raw = base64.b64decode(board_b64)
# Header: 2 bytes, pak 81 × 2 bytes (uint16 BE cage IDs), pak N bytes (sums)
cell_data = raw[2:2 + 81 * 2]
sum_data = raw[2 + 81 * 2:]
cage_map = []
for r in range(9):
row = []
for c in range(9):
idx = (r * 9 + c) * 2
cage_id = (cell_data[idx] << 8) | cell_data[idx + 1]
row.append(cage_id)
cage_map.append(row)
cage_sums = list(sum_data)
return cage_map, cage_sums
def decode_solution(solution_b64: str) -> list[list[int]]:
"""Dekóduje solution_base64 → 9x9 mřížka."""
raw = base64.b64decode(solution_b64)
values = list(raw[2:]) # skip 2-byte header
return [values[r * 9:(r + 1) * 9] for r in range(9)]
def build_cages_string(cage_map: list[list[int]], cage_sums: list[int]) -> str:
"""Vytvoří cage string ve formátu: sum,r0c0r0c1|sum,r1c2r1c3|..."""
cages = {}
for r in range(9):
for c in range(9):
cid = cage_map[r][c]
if cid not in cages:
cages[cid] = []
cages[cid].append(f"r{r}c{c}")
parts = []
for cid in sorted(cages.keys()):
s = cage_sums[cid] if cid < len(cage_sums) else 0
cells = "".join(cages[cid])
parts.append(f"{s},{cells}")
return "|".join(parts)
def build_solution_string(solution: list[list[int]]) -> str:
return "".join(str(v) for row in solution for v in row)
def process_puzzle(puzzle_number: int) -> dict | None:
pj = fetch_puzzle_json(puzzle_number)
if not pj:
return None
try:
cage_map, cage_sums = decode_board(pj["board_base64"])
solution = decode_solution(pj["solution_base64"])
cage_str = build_cages_string(cage_map, cage_sums)
sol_str = build_solution_string(solution)
game_type = PUZZLE_TYPE_MAP.get(pj.get("puzzle_type", 1), "killer_sudoku")
return {
"puzzle_number": pj["id"],
"game_type": game_type,
"difficulty": str(pj.get("difficulty", 0)),
"puzzle_date": pj.get("date"),
"puzzle": cage_str,
"solution": sol_str,
"extra": json.dumps({
"grid_size": 9,
"puzzle_number": pj["id"],
"original_difficulty": pj.get("difficulty"),
}),
"source": "dailykillersudoku.com",
}
except Exception as e:
return None
def save_batch(results: list[dict]):
conn = connect_mysql(database="puzzle")
cur = conn.cursor()
inserted = 0
for r in results:
cur.execute(
"INSERT INTO puzzles "
"(game_type, difficulty, puzzle_date, puzzle, solution, extra, source) "
"VALUES (%s, %s, %s, %s, %s, %s, %s) "
"ON DUPLICATE KEY UPDATE puzzle=VALUES(puzzle), solution=VALUES(solution), "
"extra=VALUES(extra)",
(r["game_type"], r["difficulty"], r["puzzle_date"],
r["puzzle"], r["solution"], r["extra"], r["source"]),
)
if cur.rowcount > 0:
inserted += 1
cur.close()
conn.close()
return inserted
def get_puzzle_numbers() -> list[int]:
conn = connect_mysql(database="puzzle")
cur = conn.cursor()
cur.execute("SELECT puzzle_number FROM sudoku_killer ORDER BY puzzle_number")
nums = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()
return nums
JSON_FILE = Path(__file__).parent / "killer_structured_data.json"
def download_all(puzzle_numbers: list[int]) -> list[dict]:
"""Stáhne všechna puzzle z webu, průběžně ukládá do JSON souboru."""
all_results = []
if JSON_FILE.exists():
all_results = json.loads(JSON_FILE.read_text(encoding="utf-8"))
print(f"Načteno {len(all_results)} existujících záznamů z JSON")
done_numbers = {r["puzzle_number"] for r in all_results}
remaining = [n for n in puzzle_numbers if n not in done_numbers]
print(f"Zbývá stáhnout: {len(remaining)} z {len(puzzle_numbers)}")
if not remaining:
return all_results
batch_size = 100
errors = 0
with ThreadPoolExecutor(max_workers=6) as executor:
for start in tqdm(range(0, len(remaining), batch_size),
desc="Stahování", unit="batch"):
batch_nums = remaining[start:start + batch_size]
futures = {executor.submit(process_puzzle, n): n for n in batch_nums}
for future in as_completed(futures):
result = future.result()
if result:
all_results.append(result)
else:
errors += 1
JSON_FILE.write_text(
json.dumps(all_results, ensure_ascii=False), encoding="utf-8"
)
print(f"Staženo celkem: {len(all_results)}, chyb: {errors}")
return all_results
def import_from_json():
"""Importuje data z JSON souboru do MySQL."""
if not JSON_FILE.exists():
print("JSON soubor neexistuje, nejdřív spusť stahování.")
return
all_results = json.loads(JSON_FILE.read_text(encoding="utf-8"))
print(f"Importuji {len(all_results)} záznamů z JSON do MySQL...")
batch_size = 500
total_inserted = 0
for start in tqdm(range(0, len(all_results), batch_size),
desc="Import", unit="batch"):
batch = all_results[start:start + batch_size]
inserted = save_batch(batch)
total_inserted += inserted
print(f"Import hotov: aktualizováno {total_inserted} záznamů")
def main():
# Test na jednom puzzle
print("=== Test: puzzle 376 ===")
result = process_puzzle(376)
if result:
print(f" game_type: {result['game_type']}")
print(f" difficulty: {result['difficulty']}")
print(f" date: {result['puzzle_date']}")
print(f" cages ({len(result['puzzle'].split('|'))} klecí): {result['puzzle'][:100]}...")
print(f" solution: {result['solution']}")
else:
print(" Selhalo!")
return
if "--import" in sys.argv:
import_from_json()
return
if "--run" not in sys.argv:
print("\nPro stažení spusť s --run, pro import z JSON s --import")
return
puzzle_numbers = get_puzzle_numbers()
print(f"\nCelkem puzzle k zpracování: {len(puzzle_numbers)}")
all_results = download_all(puzzle_numbers)
print("\nImportuji do MySQL...")
batch_size = 500
total_inserted = 0
for start in tqdm(range(0, len(all_results), batch_size),
desc="Import", unit="batch"):
batch = all_results[start:start + batch_size]
inserted = save_batch(batch)
total_inserted += inserted
print(f"\nHotovo: aktualizováno {total_inserted} záznamů")
if __name__ == "__main__":
main()