From 9838164b8898b8eb7e0aafd0d3ff19aac3fe5bcf Mon Sep 17 00:00:00 2001 From: "vladimir.buzalka" Date: Mon, 9 Feb 2026 20:16:37 +0100 Subject: [PATCH] z230 --- .claude/settings.local.json | 14 +++ indexer/backup.py | 42 +++++++ indexer/config.py | 3 +- indexer/db.py | 184 +++++++++++++++++------------- indexer/events.py | 38 ++++--- indexer/scanner.py | 25 ++-- main.py | 221 ++++++++++++++++++++++++++++-------- recovery.py | 64 +++++++++++ requirements.txt | 3 + 9 files changed, 444 insertions(+), 150 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 indexer/backup.py create mode 100644 recovery.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..3a20db1 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,14 @@ +{ + "permissions": { + "allow": [ + "Bash(dir /s \"U:\\\\drobboxordinacebackup\")", + "Bash(where:*)", + "Bash(dir:*)", + "Bash(python:*)", + "Bash(pip install:*)", + "Bash(tasklist:*)", + "Bash(wmic process:*)", + "Bash(taskkill:*)" + ] + } +} diff --git a/indexer/backup.py b/indexer/backup.py new file mode 100644 index 0000000..579555b --- /dev/null +++ b/indexer/backup.py @@ -0,0 +1,42 @@ +import os +import shutil +import tempfile + + +def blob_path(backup_root: str, content_hash: bytes) -> str: + """Vrátí cestu k blob souboru: BACKUP/ab/cd/abcdef...blob""" + hex_hash = content_hash.hex() + return os.path.join(backup_root, hex_hash[:2], hex_hash[2:4], hex_hash + ".blob") + + +def ensure_backed_up(files_with_hash: list, backup_root: str) -> int: + """ + Zkopíruje soubory do content-addressable storage. + files_with_hash: [(full_path, content_hash_bytes), ...] + Přeskočí soubory, jejichž blob už existuje (deduplikace). + Returns: počet nově zálohovaných souborů. + """ + backed_up = 0 + for full_path, content_hash in files_with_hash: + target = blob_path(backup_root, content_hash) + if os.path.exists(target): + continue + + target_dir = os.path.dirname(target) + os.makedirs(target_dir, exist_ok=True) + + try: + # Atomický zápis: temp soubor + přejmenování + fd, tmp_path = tempfile.mkstemp(dir=target_dir, suffix=".tmp") + os.close(fd) + shutil.copy2(full_path, tmp_path) + os.replace(tmp_path, target) + backed_up += 1 + except (FileNotFoundError, PermissionError, OSError) as e: + print(f" WARN: backup failed for {full_path}: {e}") + # Uklidíme temp soubor pokud existuje + if os.path.exists(tmp_path): + os.remove(tmp_path) + continue + + return backed_up diff --git a/indexer/config.py b/indexer/config.py index 002a19e..a8bf2b0 100644 --- a/indexer/config.py +++ b/indexer/config.py @@ -1,7 +1,6 @@ import os from dotenv import load_dotenv -# načti .env z rootu projektu load_dotenv() # ========================= @@ -24,9 +23,11 @@ DB_CONFIG = { ROOT_PATH = os.getenv("ROOT_PATH") ROOT_NAME = os.getenv("ROOT_NAME", "ORDINACE") +BACKUP_PATH = os.getenv("BACKUP_PATH") # ========================= # Behaviour # ========================= DRY_RUN = os.getenv("DRY_RUN", "1") == "1" +BATCH_SIZE = int(os.getenv("BATCH_SIZE", 1000)) diff --git a/indexer/db.py b/indexer/db.py index 9449c2a..ac4cae1 100644 --- a/indexer/db.py +++ b/indexer/db.py @@ -1,91 +1,123 @@ import pymysql -import hashlib -from indexer.config import DB_CONFIG, ROOT_NAME +from datetime import datetime +from indexer.config import DB_CONFIG, BATCH_SIZE def get_connection(): return pymysql.connect(**DB_CONFIG) -def preload_mark_all_missing(): - """ - Na začátku běhu: - označí všechny soubory jako neexistující. - Ty, které skener znovu najde, se přepnou zpět na exists_now = 1. - """ - conn = get_connection() - try: - with conn.cursor() as cur: - cur.execute("UPDATE files SET exists_now = 0") - conn.commit() - finally: - conn.close() +# ── Run management ────────────────────────────────────────── - -def path_hash(path: str) -> bytes: - """ - MD5 hash cesty – pouze identifikátor, ne bezpečnostní hash - """ - return hashlib.md5(path.encode("utf-8")).digest() - - -def find_file_by_path(cur, path_hash_bytes): +def create_run(cur) -> int: cur.execute( - """ - SELECT id, file_size, mtime, content_hash - FROM files - WHERE path_hash = %s - """, - (path_hash_bytes,) - ) - return cur.fetchone() - - -def insert_file(cur, file): - cur.execute( - """ - INSERT INTO files ( - root_name, full_path, path_hash, - file_name, directory, - file_size, mtime, content_hash, - first_seen, last_seen, exists_now - ) - VALUES ( - %s, %s, %s, - %s, %s, - %s, %s, %s, - NOW(), NOW(), 1 - ) - """, - ( - ROOT_NAME, - file["full_path"], - path_hash(file["full_path"]), - file["file_name"], - file["directory"], - file["size"], - file["mtime"], - file["content_hash"], - ) + "INSERT INTO runs (started_at, status) VALUES (%s, 'RUNNING')", + (datetime.now(),) ) return cur.lastrowid -def update_file(cur, file_id, file): +def finalize_run(cur, run_id: int, stats: dict): cur.execute( - """ - UPDATE files - SET file_size = %s, - mtime = %s, - content_hash = %s, - last_seen = NOW(), - exists_now = 1 - WHERE id = %s - """, - ( - file["size"], - file["mtime"], - file["content_hash"], - file_id, - ) + """UPDATE runs + SET finished_at = %s, status = 'COMPLETED', + files_total = %s, files_new = %s, files_modified = %s, + files_deleted = %s, files_unchanged = %s + WHERE id = %s""", + (datetime.now(), stats["total"], stats["new"], stats["modified"], + stats["deleted"], stats["unchanged"], run_id) ) + + +def fail_run(cur, run_id: int): + cur.execute( + "UPDATE runs SET finished_at = %s, status = 'FAILED' WHERE id = %s", + (datetime.now(), run_id) + ) + + +# ── Load DB state ────────────────────────────────────────── + +def load_all_files(cur) -> dict: + """ + Načte všechny existující soubory z DB do RAM. + Returns: {relative_path: {id, size, mtime, content_hash}} + """ + cur.execute( + """SELECT id, relative_path, file_size, mtime, content_hash + FROM files WHERE exists_now = 1""" + ) + result = {} + for row in cur.fetchall(): + file_id, rel_path, size, mtime, content_hash = row + result[rel_path] = { + "id": file_id, + "size": size, + "mtime": mtime, + "content_hash": content_hash, + } + return result + + +# ── Batch operations ──────────────────────────────────────── + +def batch_insert_files(cur, files_list: list, run_id: int) -> dict: + """ + Batch INSERT nových souborů. + files_list: [{relative_path, file_name, directory, size, mtime, content_hash}] + Returns: {relative_path: file_id} + """ + path_to_id = {} + for i in range(0, len(files_list), BATCH_SIZE): + chunk = files_list[i:i + BATCH_SIZE] + cur.executemany( + """INSERT INTO files + (relative_path, file_name, directory, file_size, mtime, + content_hash, first_seen_run, last_seen_run, exists_now) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 1)""", + [(f["relative_path"], f["file_name"], f["directory"], + f["size"], f["mtime"], f["content_hash"], run_id, run_id) + for f in chunk] + ) + # pymysql executemany: lastrowid = first id in batch + first_id = cur.lastrowid + for j, f in enumerate(chunk): + path_to_id[f["relative_path"]] = first_id + j + return path_to_id + + +def batch_update_modified(cur, files_list: list, run_id: int): + """ + Batch UPDATE změněných souborů. + files_list: [{id, size, mtime, content_hash}] + """ + for i in range(0, len(files_list), BATCH_SIZE): + chunk = files_list[i:i + BATCH_SIZE] + cur.executemany( + """UPDATE files + SET file_size = %s, mtime = %s, content_hash = %s, + last_seen_run = %s, exists_now = 1 + WHERE id = %s""", + [(f["size"], f["mtime"], f["content_hash"], run_id, f["id"]) + for f in chunk] + ) + + +def batch_mark_deleted(cur, file_ids: list, run_id: int): + """Batch UPDATE smazaných souborů — exists_now = 0.""" + for i in range(0, len(file_ids), BATCH_SIZE): + chunk = file_ids[i:i + BATCH_SIZE] + cur.executemany( + "UPDATE files SET exists_now = 0, last_seen_run = %s WHERE id = %s", + [(run_id, fid) for fid in chunk] + ) + + +def batch_update_unchanged(cur, file_ids: list, run_id: int): + """Batch UPDATE nezměněných souborů — jen last_seen_run.""" + for i in range(0, len(file_ids), BATCH_SIZE): + chunk = file_ids[i:i + BATCH_SIZE] + cur.executemany( + "UPDATE files SET last_seen_run = %s, exists_now = 1 WHERE id = %s", + [(run_id, fid) for fid in chunk] + ) diff --git a/indexer/events.py b/indexer/events.py index a024d01..5065d01 100644 --- a/indexer/events.py +++ b/indexer/events.py @@ -1,19 +1,21 @@ -def log_event(cur, file_id, event_type, old=None, new=None): - cur.execute( - """ - INSERT INTO file_events ( - file_id, event_type, event_time, - old_size, new_size, - old_hash, new_hash +from indexer.config import BATCH_SIZE + + +def batch_log_events(cur, events: list): + """ + Batch INSERT eventů do file_events. + events: [{run_id, file_id, event_type, old_size, new_size, old_hash, new_hash}] + """ + if not events: + return + for i in range(0, len(events), BATCH_SIZE): + chunk = events[i:i + BATCH_SIZE] + cur.executemany( + """INSERT INTO file_events + (run_id, file_id, event_type, old_size, new_size, old_hash, new_hash) + VALUES (%s, %s, %s, %s, %s, %s, %s)""", + [(e["run_id"], e["file_id"], e["event_type"], + e.get("old_size"), e.get("new_size"), + e.get("old_hash"), e.get("new_hash")) + for e in chunk] ) - VALUES (%s, %s, NOW(), %s, %s, %s, %s) - """, - ( - file_id, - event_type, - old["size"] if old else None, - new["size"] if new else None, - old["content_hash"] if old else None, - new["content_hash"] if new else None, - ) - ) diff --git a/indexer/scanner.py b/indexer/scanner.py index b90066f..a4a50fe 100644 --- a/indexer/scanner.py +++ b/indexer/scanner.py @@ -1,21 +1,30 @@ import os from datetime import datetime -from indexer.hasher import blake3_file -def scan_files(root_path): + +def scan_files(root_path: str) -> dict: + """ + Projde celý adresářový strom a vrátí dict všech souborů. + Nehasuje obsah — to se dělá až pro změněné soubory. + + Returns: + {relative_path: {full_path, file_name, directory, size, mtime}} + """ + result = {} for root, _, files in os.walk(root_path): for name in files: full_path = os.path.join(root, name) try: stat = os.stat(full_path) - except FileNotFoundError: + except (FileNotFoundError, PermissionError): continue - - yield { - "full_path": full_path.replace("\\", "/"), + rel_path = os.path.relpath(full_path, root_path).replace("\\", "/") + rel_dir = os.path.relpath(root, root_path).replace("\\", "/") + result[rel_path] = { + "full_path": full_path, "file_name": name, - "directory": root.replace("\\", "/"), + "directory": rel_dir, "size": stat.st_size, "mtime": datetime.fromtimestamp(stat.st_mtime), - "content_hash": blake3_file(full_path), } + return result diff --git a/main.py b/main.py index c4b285c..5fb17d7 100644 --- a/main.py +++ b/main.py @@ -1,73 +1,200 @@ -from indexer.config import ROOT_PATH, ROOT_NAME, DRY_RUN +from indexer.config import ROOT_PATH, ROOT_NAME, DRY_RUN, BACKUP_PATH from indexer.scanner import scan_files +from indexer.hasher import blake3_file from indexer.db import ( - get_connection, - preload_mark_all_missing, - find_file_by_path, - insert_file, - update_file, - path_hash, + get_connection, create_run, finalize_run, fail_run, + load_all_files, batch_insert_files, batch_update_modified, + batch_mark_deleted, batch_update_unchanged, ) -from indexer.events import log_event +from indexer.events import batch_log_events +from indexer.backup import ensure_backed_up + def main(): print("=" * 60) print("ORDINACE DROPBOX BACKUP – INDEXER") print(f"Root : {ROOT_PATH}") - print(f"Name : {ROOT_NAME}") + print(f"Backup : {BACKUP_PATH}") print(f"DRY RUN : {DRY_RUN}") print("=" * 60) + # ── 1. Scan filesystem (fast, no hashing) ── + print("\n[1/7] Scanning filesystem...") + fs = scan_files(ROOT_PATH) + print(f" Found {len(fs)} files on disk.") + + if DRY_RUN: + # V DRY_RUN režimu jen ukážeme co by se stalo + print("\n[DRY RUN] No DB connection, showing scan results only.") + print(f" Files on disk: {len(fs)}") + return + + # ── 2. Connect & create run ── conn = get_connection() cur = conn.cursor() + run_id = create_run(cur) + print(f"\n[2/7] Run #{run_id} created.") - if not DRY_RUN: - preload_mark_all_missing() + try: + # ── 3. Load DB state ── + print("[3/7] Loading DB state...") + db = load_all_files(cur) + print(f" {len(db)} files in DB (exists_now=1).") - created = modified = seen = 0 + # ── 4. Diff ── + print("[4/7] Diffing...") + fs_paths = set(fs.keys()) + db_paths = set(db.keys()) - for file in scan_files(ROOT_PATH): - seen += 1 - ph = path_hash(file["full_path"]) - row = find_file_by_path(cur, ph) + new_paths = fs_paths - db_paths + deleted_paths = db_paths - fs_paths + existing_paths = fs_paths & db_paths - if row is None: - created += 1 - if not DRY_RUN: - file_id = insert_file(cur, file) - log_event(cur, file_id, "CREATED", new=file) - else: - file_id, old_size, old_mtime, old_hash = row - if old_size != file["size"] or old_hash != file["content_hash"]: - modified += 1 - if not DRY_RUN: - update_file(cur, file_id, file) - log_event( - cur, - file_id, - "MODIFIED", - old={"size": old_size, "content_hash": old_hash}, - new=file, - ) + modified_paths = set() + unchanged_paths = set() + for p in existing_paths: + fs_file = fs[p] + db_file = db[p] + if fs_file["size"] != db_file["size"] or fs_file["mtime"] != db_file["mtime"]: + modified_paths.add(p) else: - if not DRY_RUN: - cur.execute( - "UPDATE files SET last_seen = NOW(), exists_now = 1 WHERE id = %s", - (file_id,) - ) + unchanged_paths.add(p) - if seen % 500 == 0: - print(f"{seen} files scanned...") + print(f" NEW: {len(new_paths)} MOD: {len(modified_paths)} " + f"DEL: {len(deleted_paths)} SAME: {len(unchanged_paths)}") - if not DRY_RUN: + # ── 5. Process changes ── + print("[5/7] Processing changes...") + events = [] + files_to_backup = [] + + # 5a) NEW files — compute BLAKE3, batch INSERT + if new_paths: + print(f" Hashing {len(new_paths)} new files...") + new_files = [] + for p in new_paths: + f = fs[p] + try: + content_hash = blake3_file(f["full_path"]) + except (FileNotFoundError, PermissionError, OSError) as e: + print(f" WARN: skip {p}: {e}") + continue + new_files.append({ + "relative_path": p, + "file_name": f["file_name"], + "directory": f["directory"], + "size": f["size"], + "mtime": f["mtime"], + "content_hash": content_hash, + }) + files_to_backup.append((f["full_path"], content_hash)) + + if new_files: + path_to_id = batch_insert_files(cur, new_files, run_id) + for nf in new_files: + events.append({ + "run_id": run_id, + "file_id": path_to_id[nf["relative_path"]], + "event_type": "CREATED", + "new_size": nf["size"], + "new_hash": nf["content_hash"], + }) + + # 5b) MODIFIED files — compute BLAKE3, batch UPDATE + if modified_paths: + print(f" Hashing {len(modified_paths)} modified files...") + mod_files = [] + for p in modified_paths: + f = fs[p] + db_file = db[p] + try: + content_hash = blake3_file(f["full_path"]) + except (FileNotFoundError, PermissionError, OSError) as e: + print(f" WARN: skip {p}: {e}") + continue + mod_files.append({ + "id": db_file["id"], + "size": f["size"], + "mtime": f["mtime"], + "content_hash": content_hash, + }) + events.append({ + "run_id": run_id, + "file_id": db_file["id"], + "event_type": "MODIFIED", + "old_size": db_file["size"], + "new_size": f["size"], + "old_hash": db_file["content_hash"], + "new_hash": content_hash, + }) + files_to_backup.append((f["full_path"], content_hash)) + + if mod_files: + batch_update_modified(cur, mod_files, run_id) + + # 5c) DELETED files — batch UPDATE exists_now=0 + if deleted_paths: + del_ids = [db[p]["id"] for p in deleted_paths] + batch_mark_deleted(cur, del_ids, run_id) + for p in deleted_paths: + events.append({ + "run_id": run_id, + "file_id": db[p]["id"], + "event_type": "DELETED", + "old_size": db[p]["size"], + "old_hash": db[p]["content_hash"], + }) + + # 5d) UNCHANGED files — batch UPDATE last_seen_run + if unchanged_paths: + unch_ids = [db[p]["id"] for p in unchanged_paths] + batch_update_unchanged(cur, unch_ids, run_id) + + # 5e) Log all events + if events: + batch_log_events(cur, events) + + # ── 6. Backup ── + if files_to_backup and BACKUP_PATH: + print(f"[6/7] Backing up {len(files_to_backup)} files...") + backed = ensure_backed_up(files_to_backup, BACKUP_PATH) + print(f" {backed} new blobs written.") + else: + print("[6/7] Nothing to backup.") + + # ── 7. Finalize ── + stats = { + "total": len(fs), + "new": len(new_paths), + "modified": len(modified_paths), + "deleted": len(deleted_paths), + "unchanged": len(unchanged_paths), + } + finalize_run(cur, run_id, stats) conn.commit() + print(f"[7/7] Run #{run_id} COMPLETED.") - print("================================") - print(f"Scanned : {seen}") - print(f"Created : {created}") - print(f"Modified : {modified}") + except Exception as e: + print(f"\nERROR: {e}") + try: + fail_run(cur, run_id) + conn.commit() + except Exception: + pass + conn.rollback() + raise + finally: + conn.close() + + # ── Summary ── + print("\n" + "=" * 60) + print(f"Total : {stats['total']}") + print(f"New : {stats['new']}") + print(f"Modified : {stats['modified']}") + print(f"Deleted : {stats['deleted']}") + print(f"Unchanged: {stats['unchanged']}") + print("=" * 60) - conn.close() if __name__ == "__main__": main() diff --git a/recovery.py b/recovery.py new file mode 100644 index 0000000..3cdd449 --- /dev/null +++ b/recovery.py @@ -0,0 +1,64 @@ +""" +Recovery script: reconstruct directory tree from a specific run. + +Usage: python recovery.py + +For a given run_id, finds all files that existed at that point +(first_seen_run <= run_id AND last_seen_run >= run_id) +and copies them from backup storage to output_dir preserving +the original directory structure. +""" + +import os +import sys +import shutil +from indexer.config import DB_CONFIG, BACKUP_PATH +from indexer.db import get_connection +from indexer.backup import blob_path + + +def recover(run_id: int, output_dir: str): + conn = get_connection() + cur = conn.cursor() + + cur.execute( + """SELECT relative_path, content_hash + FROM files + WHERE first_seen_run <= %s AND last_seen_run >= %s""", + (run_id, run_id) + ) + rows = cur.fetchall() + conn.close() + + if not rows: + print(f"No files found for run #{run_id}.") + return + + print(f"Recovering {len(rows)} files from run #{run_id} to {output_dir}") + recovered = 0 + missing = 0 + + for relative_path, content_hash in rows: + source = blob_path(BACKUP_PATH, content_hash) + target = os.path.join(output_dir, relative_path.replace("/", os.sep)) + + if not os.path.exists(source): + print(f" MISSING blob: {content_hash.hex()} for {relative_path}") + missing += 1 + continue + + os.makedirs(os.path.dirname(target), exist_ok=True) + shutil.copy2(source, target) + recovered += 1 + + print(f"\nRecovered: {recovered} Missing blobs: {missing}") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print("Usage: python recovery.py ") + sys.exit(1) + + run_id = int(sys.argv[1]) + output_dir = sys.argv[2] + recover(run_id, output_dir) diff --git a/requirements.txt b/requirements.txt index e69de29..62c1860 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,3 @@ +pymysql +blake3 +python-dotenv