#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import hashlib from datetime import datetime import mysql.connector from mysql.connector import Error from dotenv import load_dotenv from pathlib import Path # ====================================================== # Load environment # ====================================================== env_path = Path(__file__).resolve().parent / ".env" load_dotenv(env_path) # ====================================================== # MySQL connection # ====================================================== def get_db_connection(): conn = mysql.connector.connect( host=os.getenv("DB_MYSQL_HOST"), user=os.getenv("DB_MYSQL_ROOT"), password=os.getenv("DB_MYSQL_ROOT_PASS"), port=int(os.getenv("DB_MYSQL_PORT")), database="walkfiles", auth_plugin="mysql_native_password" ) c = conn.cursor() c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") c.close() return conn # ====================================================== # Helpers # ====================================================== def file_md5(path, chunk_size=1024 * 1024): """Compute content MD5 of a file in chunks.""" md5 = hashlib.md5() with open(path, "rb") as f: while chunk := f.read(chunk_size): md5.update(chunk) return md5.hexdigest() def parse_size(size_str: str) -> int: """ Convert human input like: 10MB, 500kB, 2GB into bytes. If already numeric, return as-is. """ s = size_str.strip().upper() if s.endswith("KB"): return int(float(s[:-2]) * 1024) if s.endswith("MB"): return int(float(s[:-2]) * 1024 * 1024) if s.endswith("GB"): return int(float(s[:-2]) * 1024 * 1024 * 1024) return int(s) # assume raw bytes # ====================================================== # MAIN LOGIC # ====================================================== def run_md5_calculator(device_name=None, device_id=None, extension=".pdf", max_size="50MB", path_prefix=None): """ Accepts: device_name="ANY" -> no device filter device_id="ANY" -> no device filter extension="ANY" -> no extension filter max_size="ANY" -> no size filter path_prefix="ANY" -> no path filter """ # --------------------------------------------------------- # Interpret ANY values into boolean filter logic # --------------------------------------------------------- # DEVICE filter filter_by_device = True if device_name == "ANY" or device_id == "ANY": filter_by_device = False elif device_id is None: # device_name provided -> resolve device_id if device_name is None: raise RuntimeError("You must provide device_name or device_id") cursor = None conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) row = cursor.fetchone() cursor.close(); conn.close() if not row: raise RuntimeError(f"Device '{device_name}' not found") device_id = row["id"] # EXTENSION filter filter_by_extension = (extension != "ANY") # SIZE filter filter_by_size = (max_size != "ANY") if filter_by_size: max_bytes = parse_size(max_size) else: max_bytes = None # PATH filter filter_by_path = (path_prefix not in [None, "", "ANY"]) if filter_by_path: cleaned_prefix = path_prefix.rstrip("\\/") else: cleaned_prefix = None print( f"\nšŸ” Filtering:" f" device={'ANY' if not filter_by_device else device_id}," f" ext={extension}," f" max_size={max_size}," f" prefix={path_prefix}\n" ) conn, cursor = None, None try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # --------------------------------------------------------- # Build WHERE clauses dynamically # --------------------------------------------------------- where_clauses = ["deleted = 0"] params = [] if filter_by_device: where_clauses.append("device_id=%s") params.append(device_id) if filter_by_extension: where_clauses.append("path LIKE %s") params.append("%" + extension) if filter_by_size: where_clauses.append("size <= %s") params.append(max_bytes) if filter_by_path: where_clauses.append("path LIKE %s") params.append(cleaned_prefix + "%") sql = f""" SELECT id, path, size, modified, content_md5, md5_calculated FROM files WHERE {" AND ".join(where_clauses)} """ cursor.execute(sql, params) rows = cursor.fetchall() total = len(rows) print(f"šŸ“ Files matching criteria: {total}") updates = 0 # --------------------------------------------------------- # PROCESS FILES # --------------------------------------------------------- for row in rows: file_id = row["id"] path = row["path"] modified = row["modified"] prev_md5 = row["content_md5"] prev_calc = row["md5_calculated"] # Skip missing files if not os.path.isfile(path): print(f"āš ļø Missing on disk, skipping: {path}") continue need_md5 = ( prev_md5 is None or prev_calc is None or prev_calc < modified ) if not need_md5: continue print(f"šŸ”„ Calculating MD5: {path}") new_md5 = file_md5(path) now = datetime.now().replace(microsecond=0) cursor.execute(""" UPDATE files SET content_md5=%s, md5_calculated=%s WHERE id=%s """, (new_md5, now, file_id)) updates += 1 conn.commit() print("\nāœ… MD5 calculation finished.") print(f" Updated files: {updates}") print(f" Skipped files: {total - updates}\n") except Exception as e: print("Error:", e) finally: if cursor: cursor.close() if conn: conn.close() # ====================================================== # RUN EXAMPLE # ====================================================== if __name__ == "__main__": # Example usage: run_md5_calculator( device_name="TWW11", extension="ANY", max_size="ANY", path_prefix=r"ANY" )