#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import hashlib from datetime import datetime import mysql.connector from dotenv import load_dotenv from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import threading # ====================================================== # Load environment # ====================================================== env_path = Path(__file__).resolve().parent / ".env" load_dotenv(env_path) # ====================================================== # LOGGING TOGGLE (ON/OFF) # ====================================================== LOGGING_ENABLED = False # ← NEW: Set to False to silence all thread debug logs # ====================================================== # Colors & logging helpers # ====================================================== RESET = "\033[0m" COLORS = [ "\033[92m", # green "\033[94m", # blue "\033[93m", # yellow "\033[91m", # red "\033[95m", # magenta "\033[96m", # cyan "\033[90m", # gray ] print_lock = threading.Lock() def thread_color(): name = threading.current_thread().name idx = 0 if "_" in name: suffix = name.split("_")[-1] if suffix.isdigit(): idx = int(suffix) return COLORS[idx % len(COLORS)] def log_thread(msg: str): """Thread-safe, colored log with thread name prefix.""" if not LOGGING_ENABLED: # ← NEW return with print_lock: name = threading.current_thread().name color = thread_color() print(f"{color}[{name}] {msg}{RESET}") # ====================================================== # MySQL connection (each thread gets its own) # ====================================================== def get_db_connection(): conn = mysql.connector.connect( host=os.getenv("DB_MYSQL_HOST"), user=os.getenv("DB_MYSQL_ROOT"), password=os.getenv("DB_MYSQL_ROOT_PASS"), port=int(os.getenv("DB_MYSQL_PORT")), database="walkfiles", auth_plugin="mysql_native_password" ) c = conn.cursor() c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") c.close() return conn # ====================================================== # Helpers # ====================================================== def file_md5(path, chunk_size=1024 * 1024): md5 = hashlib.md5() with open(path, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break md5.update(chunk) return md5.hexdigest() def parse_size(size_str: str) -> int: s = size_str.strip().upper() if s.endswith("KB"): return int(float(s[:-2]) * 1024) if s.endswith("MB"): return int(float(s[:-2]) * 1024 * 1024) if s.endswith("GB"): return int(float(s[:-2]) * 1024 * 1024 * 1024) return int(s) # ====================================================== # WORKER: Runs in thread # ====================================================== def process_one_file(row): file_id = row["id"] path = row["path"] modified = row["modified"] prev_md5 = row.get("content_md5") prev_calc = row.get("md5_calculated") log_thread(f"START ID={file_id} → {path}") # --- Skip if file does not exist --- if not os.path.isfile(path): log_thread(f"MISS ID={file_id} (file not found)") return file_id, "missing", None # --- Decide if MD5 calculation is needed --- if prev_md5 and prev_calc and prev_calc >= modified: log_thread(f"SKIP ID={file_id} (md5 up-to-date)") return file_id, "skip", None # --- Calculate MD5 --- try: new_md5 = file_md5(path) except Exception as e: log_thread(f"ERROR ID={file_id} while reading file: {e}") return file_id, "error", str(e) now = datetime.now().replace(microsecond=0) # --- Update DB inside thread --- try: conn = get_db_connection() c = conn.cursor() c.execute(""" UPDATE files SET content_md5=%s, md5_calculated=%s WHERE id=%s """, (new_md5, now, file_id)) conn.commit() c.close() conn.close() log_thread(f"UPDATE ID={file_id} (MD5={new_md5})") return file_id, "updated", new_md5 except Exception as e: log_thread(f"ERROR ID={file_id} DB update failed: {e}") return file_id, "error", str(e) # ====================================================== # MAIN LOGIC # ====================================================== def run_md5_calculator(device_name=None, device_id=None, extension=".pdf", max_size="50MB", path_prefix=None, threads=8): # DEVICE resolution filter_by_device = True if device_name == "ANY" or device_id == "ANY": filter_by_device = False elif device_id is None: if device_name is None: raise RuntimeError("You must provide device_name or device_id") conn = get_db_connection() cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) row = cur.fetchone() cur.close() conn.close() if not row: raise RuntimeError(f"Device '{device_name}' not found") device_id = row["id"] filter_by_extension = (extension != "ANY") filter_by_size = (max_size != "ANY") max_bytes = parse_size(max_size) if filter_by_size else None filter_by_path = (path_prefix not in [None, "", "ANY"]) cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None print(f"\nšŸ” Filtering:" f" device={'ANY' if not filter_by_device else device_id}," f" ext={extension}," f" max_size={max_size}," f" prefix={path_prefix}\n") # Fetch rows conn = get_db_connection() cursor = conn.cursor(dictionary=True) where = ["deleted = 0"] params = [] if filter_by_device: where.append("device_id=%s") params.append(device_id) if filter_by_extension: where.append("path LIKE %s") params.append("%" + extension) if filter_by_size: where.append("size <= %s") params.append(max_bytes) if filter_by_path: where.append("path LIKE %s") params.append(cleaned_prefix + "%") sql = f""" SELECT id, path, size, modified, content_md5, md5_calculated FROM files WHERE {" AND ".join(where)} AND NOT ( content_md5 IS NOT NULL AND md5_calculated IS NOT NULL AND md5_calculated >= modified ) """ cursor.execute(sql, params) rows = cursor.fetchall() cursor.close() conn.close() total = len(rows) print(f"šŸ“ Files matching criteria: {total}\n") if total == 0: print("Nothing to do, exiting.") return # MULTITHREADED MD5 updated = skipped = missing = errors = 0 with ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Worker") as exe: futures = {exe.submit(process_one_file, r): r["id"] for r in rows} for i, future in enumerate(as_completed(futures), start=1): file_id = futures[future] try: _id, status, result = future.result() except Exception as e: log_thread(f"FUTURE ERROR for ID={file_id}: {e}") errors += 1 continue if status == "updated": updated += 1 elif status == "skip": skipped += 1 elif status == "missing": missing += 1 elif status == "error": errors += 1 if i % 100 == 0: print(f"… processed {i}/{total} files") # SUMMARY print("\n============================") print("āœ… Multithreaded MD5 finished") print("============================") print(f"Updated: {updated}") print(f"Skipped: {skipped}") print(f"Missing: {missing}") print(f"Errors: {errors}") print(f"Threads: {threads}\n") # ====================================================== # RUN EXAMPLE # ====================================================== if __name__ == "__main__": run_md5_calculator( device_name="TOWER", extension="ANY", max_size="ANY", path_prefix="ANY", threads=6 )