#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import hashlib from datetime import datetime import mysql.connector from dotenv import load_dotenv from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed # ====================================================== # Load environment # ====================================================== env_path = Path(__file__).resolve().parent / ".env" load_dotenv(env_path) # ====================================================== # MySQL connection (each thread gets its own) # ====================================================== def get_db_connection(): conn = mysql.connector.connect( host=os.getenv("DB_MYSQL_HOST"), user=os.getenv("DB_MYSQL_ROOT"), password=os.getenv("DB_MYSQL_ROOT_PASS"), port=int(os.getenv("DB_MYSQL_PORT")), database="walkfiles", auth_plugin="mysql_native_password" ) c = conn.cursor() c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") c.close() return conn # ====================================================== # Helpers # ====================================================== def file_md5(path, chunk_size=1024 * 1024): md5 = hashlib.md5() with open(path, "rb") as f: while chunk := f.read(chunk_size): md5.update(chunk) return md5.hexdigest() def parse_size(size_str: str) -> int: s = size_str.strip().upper() if s.endswith("KB"): return int(float(s[:-2]) * 1024) if s.endswith("MB"): return int(float(s[:-2]) * 1024 * 1024) if s.endswith("GB"): return int(float(s[:-2]) * 1024 * 1024 * 1024) return int(s) # ====================================================== # WORKER: Runs in thread # ====================================================== def process_one_file(row): file_id = row["id"] path = row["path"] modified = row["modified"] prev_md5 = row["content_md5"] prev_calc = row["md5_calculated"] # --- Skip if file does not exist --- if not os.path.isfile(path): return (file_id, "missing", None) # --- Decide if MD5 needed --- need_md5 = ( prev_md5 is None or prev_calc is None or prev_calc < modified ) if not need_md5: return (file_id, "skip", None) # --- Calculate MD5 --- new_md5 = file_md5(path) now = datetime.now().replace(microsecond=0) # --- Update DB inside thread --- try: conn = get_db_connection() c = conn.cursor() c.execute(""" UPDATE files SET content_md5=%s, md5_calculated=%s WHERE id=%s """, (new_md5, now, file_id)) conn.commit() c.close() conn.close() return (file_id, "updated", new_md5) except Exception as e: return (file_id, "error", str(e)) # ====================================================== # MAIN LOGIC (single-threaded DB query + multi-threaded MD5) # ====================================================== def run_md5_calculator(device_name=None, device_id=None, extension=".pdf", max_size="50MB", path_prefix=None, threads=8): # ---------------------------- # DEVICE filter resolution # ---------------------------- filter_by_device = True if device_name == "ANY" or device_id == "ANY": filter_by_device = False elif device_id is None: if device_name is None: raise RuntimeError("You must provide device_name or device_id") conn = get_db_connection() cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) row = cur.fetchone() cur.close(); conn.close() if not row: raise RuntimeError(f"Device '{device_name}' not found") device_id = row["id"] # EXTENSION filter filter_by_extension = (extension != "ANY") # SIZE filter filter_by_size = (max_size != "ANY") max_bytes = parse_size(max_size) if filter_by_size else None # PATH filter filter_by_path = (path_prefix not in [None, "", "ANY"]) cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None print(f"\nšŸ” Filtering:" f" device={'ANY' if not filter_by_device else device_id}," f" ext={extension}," f" max_size={max_size}," f" prefix={path_prefix}\n") # --------------------------------------- # Fetch all rows in a single DB query # --------------------------------------- conn = get_db_connection() cursor = conn.cursor(dictionary=True) where = ["deleted = 0"] params = [] if filter_by_device: where.append("device_id=%s") params.append(device_id) if filter_by_extension: where.append("path LIKE %s") params.append("%" + extension) if filter_by_size: where.append("size <= %s") params.append(max_bytes) if filter_by_path: where.append("path LIKE %s") params.append(cleaned_prefix + "%") sql = f""" SELECT id, path, size, modified, content_md5, md5_calculated FROM files WHERE {" AND ".join(where)} """ cursor.execute(sql, params) rows = cursor.fetchall() cursor.close(); conn.close() total = len(rows) print(f"šŸ“ Files matching criteria: {total}\n") # ====================================================== # === MULTITHREADED MD5 CALCULATION BELOW ============ # ====================================================== updated = 0 skipped = 0 missing = 0 errors = 0 with ThreadPoolExecutor(max_workers=threads) as exe: futures = {exe.submit(process_one_file, r): r["id"] for r in rows} for future in as_completed(futures): file_id = futures[future] status, result = None, None try: file_id, status, result = future.result() except Exception as e: print(f"āŒ Thread error for ID {file_id}: {e}") errors += 1 continue if status == "updated": updated += 1 elif status == "skip": skipped += 1 elif status == "missing": missing += 1 elif status == "error": errors += 1 print(f"āš ļø DB update error: {result}") # ====================================================== # SUMMARY # ====================================================== print("\n============================") print("āœ… Multithreaded MD5 finished") print("============================") print(f"Updated: {updated}") print(f"Skipped: {skipped}") print(f"Missing: {missing}") print(f"Errors: {errors}") print(f"Threads: {threads}\n") # ====================================================== # RUN EXAMPLE # ====================================================== if __name__ == "__main__": run_md5_calculator( device_name="TOWER", extension="ANY", max_size="ANY", path_prefix="ANY", threads=12 # ← ADJUST THREAD COUNT HERE )