From 8b7f99ca825076930c5b1453689f379a9c629ad1 Mon Sep 17 00:00:00 2001 From: vlado Date: Wed, 10 Dec 2025 14:05:31 +0100 Subject: [PATCH] tw22 --- 22 WalkandSave.py | 3 +- 23 WalkadnSave.py | 485 ++++++++++++++++++++++++++++++++++ 52 MD5CalculateMultiThread.py | 140 ++++++---- 3 files changed, 580 insertions(+), 48 deletions(-) create mode 100644 23 WalkadnSave.py diff --git a/22 WalkandSave.py b/22 WalkandSave.py index 7ab97c5..783a3b4 100644 --- a/22 WalkandSave.py +++ b/22 WalkandSave.py @@ -258,7 +258,8 @@ def walk_and_store_bulk(): BATCH_SIZE = 10000 # target_dir = r"\\tower1\#colddata" # target_dir = r"z:" - target_dir = r"\\tower\ebooks" + # target_dir = r"\\tower\ebooks" + target_dir = r"\\tower\dedup" # device_name = "TW22" device_name = "TOWER" diff --git a/23 WalkadnSave.py b/23 WalkadnSave.py new file mode 100644 index 0000000..08c17e1 --- /dev/null +++ b/23 WalkadnSave.py @@ -0,0 +1,485 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import hashlib +from datetime import datetime +import mysql.connector +from mysql.connector import Error +from dotenv import load_dotenv +from pathlib import Path +import unicodedata + +# ====================================================== +# Load .env from the script directory +# ====================================================== + +env_path = Path(__file__).resolve().parent / ".env" +load_dotenv(env_path) + + +# ====================================================== +# Helper: MD5 of full file path string +# ====================================================== + +def md5_path(path: str) -> str: + return hashlib.md5(path.encode("utf8")).hexdigest() + + +# ====================================================== +# MySQL CONNECTIONS +# ====================================================== + +def get_server_connection(): + return mysql.connector.connect( + host=os.getenv("DB_MYSQL_HOST"), + user=os.getenv("DB_MYSQL_ROOT"), + password=os.getenv("DB_MYSQL_ROOT_PASS"), + port=int(os.getenv("DB_MYSQL_PORT")), + auth_plugin="mysql_native_password", + ) + + +def get_db_connection(): + conn = mysql.connector.connect( + host=os.getenv("DB_MYSQL_HOST"), + user=os.getenv("DB_MYSQL_ROOT"), + password=os.getenv("DB_MYSQL_ROOT_PASS"), + port=int(os.getenv("DB_MYSQL_PORT")), + database="walkfiles", + auth_plugin="mysql_native_password", + ) + + c = conn.cursor() + c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") + c.close() + return conn + + +# ====================================================== +# DATABASE INITIALIZATION +# ====================================================== + +def init_db(): + # Ensure DB exists + server = get_server_connection() + cur = server.cursor() + cur.execute(""" + CREATE DATABASE IF NOT EXISTS walkfiles + DEFAULT CHARACTER SET utf8mb4 + COLLATE utf8mb4_general_ci + """) + server.commit() + cur.close() + server.close() + + # Connect + conn = get_db_connection() + cursor = conn.cursor() + + # DEVICES + cursor.execute(""" + CREATE TABLE IF NOT EXISTS devices ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) UNIQUE, + scanned_at DATETIME NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """) + + # FOLDERS + cursor.execute(""" + CREATE TABLE IF NOT EXISTS folders ( + id INT AUTO_INCREMENT PRIMARY KEY, + path VARCHAR(2048) NOT NULL, + parent_id INT NULL, + device_id INT NOT NULL, + first_seen DATETIME NOT NULL, + last_seen DATETIME NOT NULL, + deleted TINYINT(1) NOT NULL DEFAULT 0, + + CONSTRAINT fk_folder_device + FOREIGN KEY (device_id) REFERENCES devices(id) + ON DELETE CASCADE, + + UNIQUE KEY uniq_folder_path (device_id, path(255)), + INDEX idx_folder_dev (device_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """) + + # FILES + cursor.execute(""" + CREATE TABLE IF NOT EXISTS files ( + id INT AUTO_INCREMENT PRIMARY KEY, + + name VARCHAR(255) NOT NULL, + path VARCHAR(2048) NOT NULL, + path_md5 CHAR(32) NOT NULL, + + size BIGINT NULL, + modified DATETIME NULL, + type VARCHAR(255) NULL, + + folder_id INT NULL, + device_id INT NOT NULL, + deleted TINYINT(1) NOT NULL DEFAULT 0, + + first_seen DATETIME NOT NULL, + last_seen DATETIME NOT NULL, + + CONSTRAINT fk_file_folder + FOREIGN KEY (folder_id) REFERENCES folders(id) + ON DELETE SET NULL, + + CONSTRAINT fk_file_device + FOREIGN KEY (device_id) REFERENCES devices(id) + ON DELETE CASCADE, + + UNIQUE KEY uniq_file_path_md5 (device_id, path_md5), + INDEX idx_file_folder (folder_id), + INDEX idx_file_deleted (device_id, deleted) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """) + + conn.commit() + return conn, cursor + + +# ====================================================== +# HELPERS — DEVICES & FOLDERS +# ====================================================== + +def get_or_create_device(cursor, conn, name: str) -> int: + now = datetime.now() + cursor.execute("INSERT IGNORE INTO devices (name, scanned_at) VALUES (%s,%s)", (name, now)) + conn.commit() + + cursor.execute("SELECT id FROM devices WHERE name=%s", (name,)) + return cursor.fetchone()[0] + + +def load_folder_state(cursor, device_id: int): + """ + Načte všechny složky pro zařízení a uloží jako: + folder_state[normalized_path] = {"id": id, "deleted": 0/1} + """ + cursor.execute(""" + SELECT id, path, deleted + FROM folders + WHERE device_id=%s + """, (device_id,)) + + out = {} + for folder_id, path, deleted in cursor.fetchall(): + norm_path = os.path.normpath(path) + out[norm_path] = {"id": folder_id, "deleted": int(deleted)} + return out + + +def get_or_create_folder(cursor, conn, folder_state, device_id, folder_path, parent_id): + """ + Vytvoří nebo najde složku. Ošetřuje: + - Unicode normalizaci (Černý vs Černý) + - cache v paměti (folder_state) + - idempotentní INSERT (ON DUPLICATE KEY UPDATE) + """ + # Normalize Unicode + path form + folder_path = unicodedata.normalize("NFC", folder_path) + folder_path = os.path.normpath(folder_path) + + key = folder_path + + # 1) Cache hit + if key in folder_state: + return folder_state[key]["id"] + + now = datetime.now() + + # 2) Zkus SELECT + cursor.execute(""" + SELECT id + FROM folders + WHERE device_id = %s AND path = %s + LIMIT 1 + """, (device_id, folder_path)) + row = cursor.fetchone() + + if row: + folder_id = row[0] + folder_state[key] = {"id": folder_id, "deleted": 0} + return folder_id + + # 3) INSERT (idempotent) + cursor.execute(""" + INSERT INTO folders (path, parent_id, device_id, first_seen, last_seen) + VALUES (%s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + id = LAST_INSERT_ID(id), + last_seen = VALUES(last_seen) + """, (folder_path, parent_id, device_id, now, now)) + + conn.commit() + + folder_id = cursor.lastrowid + folder_state[key] = {"id": folder_id, "deleted": 0} + return folder_id + + +# ====================================================== +# LOAD LAST FILE STATE +# ====================================================== + +def load_last_file_state(cursor, device_id: int): + """ + Načte poslední známý stav souborů pro zařízení, indexovaný podle path_md5. + (Z historických důvodů přes MAX(id), i když máš UNIQUE na (device_id, path_md5)) + """ + cursor.execute(""" + SELECT f.id, f.path_md5, f.deleted, f.size, f.modified + FROM files f + JOIN ( + SELECT MAX(id) AS mx + FROM files + WHERE device_id=%s + GROUP BY path_md5 + ) t ON f.id = t.mx + """, (device_id,)) + + out = {} + for fid, md5, deleted, size, modified in cursor.fetchall(): + out[md5] = { + "id": fid, + "deleted": int(deleted), + "size": size, + "modified": modified, + } + return out + + +# ====================================================== +# MAIN SCANNER WITH BATCHING +# ====================================================== + +def walk_and_store_bulk(): + + BATCH_SIZE = 10000 + # target_dir = r"\\tower1\#colddata" + # target_dir = r"z:" + target_dir = r"\\tower\ebooks" + # target_dir = r"\\tower\dedup" + device_name = "TOWER" + + # Normalizovaný root pro porovnávání a LIKE + target_dir_norm = os.path.normpath(target_dir) + + if not os.path.isdir(target_dir): + print("Invalid directory:", target_dir) + return + + conn, cursor = init_db() + now = datetime.now() + + device_id = get_or_create_device(cursor, conn, device_name) + folder_state = load_folder_state(cursor, device_id) + file_state = load_last_file_state(cursor, device_id) + + seen_folders = set() + seen_files = set() # MD5 of path + + files_to_insert = [] + files_to_update = [] + + total_files = 0 + + print(f"🔍 Scanning: {target_dir} (device {device_id})") + + # ------------------------------------------------- + # WALK FILESYSTEM + # ------------------------------------------------- + for root, dirs, files in os.walk(target_dir): + folder_path = os.path.normpath(root) + + # 1️⃣ determine parent_id correctly + if folder_path == target_dir_norm: + parent_id = None + else: + parent_folder_path = os.path.normpath(os.path.dirname(folder_path)) + parent_id = get_or_create_folder(cursor, conn, folder_state, + device_id, parent_folder_path, + None) + + # 2️⃣ now insert current folder with correct parent_id + seen_folders.add(folder_path) + folder_id = get_or_create_folder(cursor, conn, folder_state, + device_id, folder_path, + parent_id) + + # ------------------------------------------------- + # FILE LOOP + # ------------------------------------------------- + for name in files: + total_files += 1 + + filepath = os.path.normpath(os.path.join(folder_path, name)) + md5 = md5_path(filepath) + seen_files.add(md5) + + try: + st = os.stat(filepath) + except FileNotFoundError: + continue + + modified = datetime.fromtimestamp(st.st_mtime).replace(microsecond=0) + size = st.st_size + ext = os.path.splitext(name)[1][:250] + + prev = file_state.get(md5) + + if prev is None: + # nový soubor + files_to_insert.append( + (name, filepath, md5, size, modified, ext, + folder_id, device_id, 0, now, now) + ) + else: + if prev["deleted"] == 1: + # "vzkříšený" soubor + files_to_insert.append( + (name, filepath, md5, size, modified, ext, + folder_id, device_id, 0, now, now) + ) + else: + # existuje a není deleted → zkontroluj změnu velikosti / času + if prev["size"] != size or prev["modified"] != modified: + files_to_update.append( + (size, modified, now, prev["id"]) + ) + + # ------------------------------------------------- + # BATCH FLUSHING + # ------------------------------------------------- + if len(files_to_insert) >= BATCH_SIZE: + print(f"💾 Flushing {len(files_to_insert)} inserts...") + cursor.executemany(""" + INSERT INTO files ( + name, path, path_md5, size, modified, type, + folder_id, device_id, deleted, + first_seen, last_seen + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + """, files_to_insert) + conn.commit() + files_to_insert.clear() + + if len(files_to_update) >= BATCH_SIZE: + print(f"💾 Flushing {len(files_to_update)} updates...") + cursor.executemany(""" + UPDATE files + SET size=%s, modified=%s, last_seen=%s, deleted=0 + WHERE id=%s + """, files_to_update) + conn.commit() + files_to_update.clear() + + # PROGRESS + if total_files % 1000 == 0: + print(f" ... processed {total_files} files") + + # ------------------------------------------------- + # FINAL FLUSH (REMAINING INSERTS/UPDATES) + # ------------------------------------------------- + + if files_to_insert: + print(f"💾 Final flush: {len(files_to_insert)} inserts") + cursor.executemany(""" + INSERT INTO files ( + name, path, path_md5, size, modified, type, + folder_id, device_id, deleted, + first_seen, last_seen + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + """, files_to_insert) + conn.commit() + + if files_to_update: + print(f"💾 Final flush: {len(files_to_update)} updates") + cursor.executemany(""" + UPDATE files + SET size=%s, modified=%s, last_seen=%s, deleted=0 + WHERE id=%s + """, files_to_update) + conn.commit() + + # ------------------------------------------------- + # MARK DELETED FILES — ONLY IN THIS SUBTREE + # ------------------------------------------------- + files_deleted_count = 0 + + like_prefix = target_dir_norm.rstrip("\\/") + "%" + + cursor.execute(""" + SELECT id, path_md5 + FROM files + WHERE device_id = %s + AND deleted = 0 + AND path LIKE %s + """, (device_id, like_prefix)) + + candidates = cursor.fetchall() + ids_to_delete = [fid for (fid, md5) in candidates if md5 not in seen_files] + + if ids_to_delete: + print(f"💾 Marking {len(ids_to_delete)} files as deleted in subtree") + cursor.executemany(""" + UPDATE files + SET deleted=1, last_seen=%s + WHERE id=%s + """, [(now, fid) for fid in ids_to_delete]) + conn.commit() + files_deleted_count = len(ids_to_delete) + + # ------------------------------------------------- + # MARK DELETED FOLDERS — ONLY IN THIS SUBTREE + # ------------------------------------------------- + folders_to_mark_deleted = [] + for path, info in folder_state.items(): + # omez na subtree (včetně root složky) + norm_path = os.path.normpath(path) + if not norm_path.startswith(target_dir_norm): + continue + if info["deleted"] == 0 and norm_path not in seen_folders: + folders_to_mark_deleted.append((now, info["id"])) + + folders_deleted_count = 0 + if folders_to_mark_deleted: + cursor.executemany(""" + UPDATE folders + SET deleted=1, last_seen=%s + WHERE id=%s + """, folders_to_mark_deleted) + conn.commit() + folders_deleted_count = len(folders_to_mark_deleted) + + # ------------------------------------------------- + # Update device timestamp + # ------------------------------------------------- + cursor.execute("UPDATE devices SET scanned_at=%s WHERE id=%s", (now, device_id)) + conn.commit() + + cursor.close() + conn.close() + + print("") + print("✅ Scan completed.") + print(" Total files scanned:", total_files) + print(" Files inserted:", len(files_to_insert)) # po flushi je 0, ale nechávám pro konzistenci + print(" Files updated:", len(files_to_update)) # dtto + print(" Files deleted in subtree:", files_deleted_count) + print(" Folders deleted in subtree:", folders_deleted_count) + + +# ====================================================== +# MAIN ENTRY +# ====================================================== + +if __name__ == '__main__': + walk_and_store_bulk() diff --git a/52 MD5CalculateMultiThread.py b/52 MD5CalculateMultiThread.py index 2a683a2..92efd82 100644 --- a/52 MD5CalculateMultiThread.py +++ b/52 MD5CalculateMultiThread.py @@ -8,7 +8,7 @@ import mysql.connector from dotenv import load_dotenv from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed - +import threading # ====================================================== # Load environment @@ -16,6 +16,47 @@ from concurrent.futures import ThreadPoolExecutor, as_completed env_path = Path(__file__).resolve().parent / ".env" load_dotenv(env_path) +# ====================================================== +# LOGGING TOGGLE (ON/OFF) +# ====================================================== +LOGGING_ENABLED = False # ← NEW: Set to False to silence all thread debug logs + +# ====================================================== +# Colors & logging helpers +# ====================================================== +RESET = "\033[0m" +COLORS = [ + "\033[92m", # green + "\033[94m", # blue + "\033[93m", # yellow + "\033[91m", # red + "\033[95m", # magenta + "\033[96m", # cyan + "\033[90m", # gray +] + +print_lock = threading.Lock() + + +def thread_color(): + name = threading.current_thread().name + idx = 0 + if "_" in name: + suffix = name.split("_")[-1] + if suffix.isdigit(): + idx = int(suffix) + return COLORS[idx % len(COLORS)] + + +def log_thread(msg: str): + """Thread-safe, colored log with thread name prefix.""" + if not LOGGING_ENABLED: # ← NEW + return + with print_lock: + name = threading.current_thread().name + color = thread_color() + print(f"{color}[{name}] {msg}{RESET}") + # ====================================================== # MySQL connection (each thread gets its own) @@ -41,7 +82,10 @@ def get_db_connection(): def file_md5(path, chunk_size=1024 * 1024): md5 = hashlib.md5() with open(path, "rb") as f: - while chunk := f.read(chunk_size): + while True: + chunk = f.read(chunk_size) + if not chunk: + break md5.update(chunk) return md5.hexdigest() @@ -64,24 +108,28 @@ def process_one_file(row): file_id = row["id"] path = row["path"] modified = row["modified"] - prev_md5 = row["content_md5"] - prev_calc = row["md5_calculated"] + prev_md5 = row.get("content_md5") + prev_calc = row.get("md5_calculated") + + log_thread(f"START ID={file_id} → {path}") # --- Skip if file does not exist --- if not os.path.isfile(path): - return (file_id, "missing", None) + log_thread(f"MISS ID={file_id} (file not found)") + return file_id, "missing", None - # --- Decide if MD5 needed --- - need_md5 = ( - prev_md5 is None or - prev_calc is None or - prev_calc < modified - ) - if not need_md5: - return (file_id, "skip", None) + # --- Decide if MD5 calculation is needed --- + if prev_md5 and prev_calc and prev_calc >= modified: + log_thread(f"SKIP ID={file_id} (md5 up-to-date)") + return file_id, "skip", None # --- Calculate MD5 --- - new_md5 = file_md5(path) + try: + new_md5 = file_md5(path) + except Exception as e: + log_thread(f"ERROR ID={file_id} while reading file: {e}") + return file_id, "error", str(e) + now = datetime.now().replace(microsecond=0) # --- Update DB inside thread --- @@ -97,14 +145,17 @@ def process_one_file(row): conn.commit() c.close() conn.close() - return (file_id, "updated", new_md5) + + log_thread(f"UPDATE ID={file_id} (MD5={new_md5})") + return file_id, "updated", new_md5 except Exception as e: - return (file_id, "error", str(e)) + log_thread(f"ERROR ID={file_id} DB update failed: {e}") + return file_id, "error", str(e) # ====================================================== -# MAIN LOGIC (single-threaded DB query + multi-threaded MD5) +# MAIN LOGIC # ====================================================== def run_md5_calculator(device_name=None, device_id=None, @@ -113,9 +164,7 @@ def run_md5_calculator(device_name=None, path_prefix=None, threads=8): - # ---------------------------- - # DEVICE filter resolution - # ---------------------------- + # DEVICE resolution filter_by_device = True if device_name == "ANY" or device_id == "ANY": filter_by_device = False @@ -128,21 +177,17 @@ def run_md5_calculator(device_name=None, cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) row = cur.fetchone() - cur.close(); conn.close() + cur.close() + conn.close() if not row: raise RuntimeError(f"Device '{device_name}' not found") device_id = row["id"] - # EXTENSION filter filter_by_extension = (extension != "ANY") - - # SIZE filter filter_by_size = (max_size != "ANY") max_bytes = parse_size(max_size) if filter_by_size else None - - # PATH filter filter_by_path = (path_prefix not in [None, "", "ANY"]) cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None @@ -152,9 +197,7 @@ def run_md5_calculator(device_name=None, f" max_size={max_size}," f" prefix={path_prefix}\n") - # --------------------------------------- - # Fetch all rows in a single DB query - # --------------------------------------- + # Fetch rows conn = get_db_connection() cursor = conn.cursor(dictionary=True) @@ -181,34 +224,38 @@ def run_md5_calculator(device_name=None, SELECT id, path, size, modified, content_md5, md5_calculated FROM files WHERE {" AND ".join(where)} + AND NOT ( + content_md5 IS NOT NULL + AND md5_calculated IS NOT NULL + AND md5_calculated >= modified + ) """ cursor.execute(sql, params) rows = cursor.fetchall() - cursor.close(); conn.close() + cursor.close() + conn.close() total = len(rows) print(f"📁 Files matching criteria: {total}\n") - # ====================================================== - # === MULTITHREADED MD5 CALCULATION BELOW ============ - # ====================================================== - updated = 0 - skipped = 0 - missing = 0 - errors = 0 + if total == 0: + print("Nothing to do, exiting.") + return - with ThreadPoolExecutor(max_workers=threads) as exe: + # MULTITHREADED MD5 + updated = skipped = missing = errors = 0 + + with ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Worker") as exe: futures = {exe.submit(process_one_file, r): r["id"] for r in rows} - for future in as_completed(futures): + for i, future in enumerate(as_completed(futures), start=1): file_id = futures[future] - status, result = None, None try: - file_id, status, result = future.result() + _id, status, result = future.result() except Exception as e: - print(f"❌ Thread error for ID {file_id}: {e}") + log_thread(f"FUTURE ERROR for ID={file_id}: {e}") errors += 1 continue @@ -220,11 +267,11 @@ def run_md5_calculator(device_name=None, missing += 1 elif status == "error": errors += 1 - print(f"⚠️ DB update error: {result}") - # ====================================================== + if i % 100 == 0: + print(f"… processed {i}/{total} files") + # SUMMARY - # ====================================================== print("\n============================") print("✅ Multithreaded MD5 finished") print("============================") @@ -235,7 +282,6 @@ def run_md5_calculator(device_name=None, print(f"Threads: {threads}\n") - # ====================================================== # RUN EXAMPLE # ====================================================== @@ -245,5 +291,5 @@ if __name__ == "__main__": extension="ANY", max_size="ANY", path_prefix="ANY", - threads=12 # ← ADJUST THREAD COUNT HERE + threads=6 )