From d1b89d8e345a5e15fa9223133c4ed798c5e6dabb Mon Sep 17 00:00:00 2001 From: "vladimir.buzalka" Date: Tue, 25 Nov 2025 11:26:03 +0100 Subject: [PATCH] z230 --- 50 MD5calculate.py | 192 -------------------------- 52 MD5CalculateMultiThread.py | 249 ++++++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+), 192 deletions(-) delete mode 100644 50 MD5calculate.py create mode 100644 52 MD5CalculateMultiThread.py diff --git a/50 MD5calculate.py b/50 MD5calculate.py deleted file mode 100644 index a940d31..0000000 --- a/50 MD5calculate.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import os -import hashlib -from datetime import datetime -import mysql.connector -from dotenv import load_dotenv -from pathlib import Path - - -# ====================================================== -# Load environment -# ====================================================== -env_path = Path(__file__).resolve().parent / ".env" -load_dotenv(env_path) - - -# ====================================================== -# MySQL connection -# ====================================================== -def get_db_connection(): - conn = mysql.connector.connect( - host=os.getenv("DB_MYSQL_HOST"), - user=os.getenv("DB_MYSQL_ROOT"), - password=os.getenv("DB_MYSQL_ROOT_PASS"), - port=int(os.getenv("DB_MYSQL_PORT")), - database="walkfiles", - auth_plugin="mysql_native_password" - ) - c = conn.cursor() - c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") - c.close() - return conn - - -# ====================================================== -# Helpers -# ====================================================== -def file_md5(path, chunk_size=1024 * 1024): - """Compute content MD5 of a file in chunks.""" - md5 = hashlib.md5() - with open(path, "rb") as f: - while chunk := f.read(chunk_size): - md5.update(chunk) - return md5.hexdigest() - - -def parse_size(size_str: str) -> int: - """ - Convert human input like: - 10MB, 500kB, 2GB - into bytes. If already numeric, return as-is. - """ - s = size_str.strip().upper() - if s.endswith("KB"): - return int(float(s[:-2]) * 1024) - if s.endswith("MB"): - return int(float(s[:-2]) * 1024 * 1024) - if s.endswith("GB"): - return int(float(s[:-2]) * 1024 * 1024 * 1024) - return int(s) # assume raw bytes - - -# ====================================================== -# MAIN LOGIC -# ====================================================== -def run_md5_calculator(device_name=None, - device_id=None, - extension=".pdf", - max_size="50MB"): - """ - device_name OR device_id must be provided. - extension: ".pdf", ".jpg", etc. - max_size: "10MB", "500KB", "1GB" or number of bytes - """ - - max_bytes = parse_size(max_size) - - conn, cursor = None, None - - try: - conn = get_db_connection() - cursor = conn.cursor(dictionary=True) - - # ------------------------------------------ - # Resolve device_id if only device_name given - # ------------------------------------------ - if device_id is None: - if device_name is None: - raise RuntimeError("You must provide device_name or device_id") - cursor.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) - row = cursor.fetchone() - if not row: - raise RuntimeError(f"Device '{device_name}' not found") - device_id = row["id"] - - print(f"\nšŸ” Filtering: device={device_id}, ext={extension}, max_size={max_bytes} bytes\n") - - # ------------------------------------------ - # SELECT only files that need MD5 calculation - # ------------------------------------------ - cursor.execute(""" - SELECT id, path, size, modified, content_md5, md5_calculated - FROM files - WHERE device_id=%s - AND deleted = 0 - AND path LIKE %s - AND size <= %s - """, (device_id, "%" + extension, max_bytes)) - - rows = cursor.fetchall() - total = len(rows) - print(f"šŸ“ Files matching criteria: {total}") - - updates = 0 - - for row in rows: - - file_id = row["id"] - path = row["path"] - size = row["size"] - modified = row["modified"] - prev_md5 = row["content_md5"] - prev_calc = row["md5_calculated"] - - # ------------------------------- - # Skip missing files on disk - # ------------------------------- - if not os.path.isfile(path): - print(f"āš ļø Missing on disk, skipping: {path}") - continue - - # ------------------------------- - # Check conditions for recalculation - # ------------------------------- - need_md5 = False - - if prev_md5 is None: - need_md5 = True - else: - if prev_calc is None or prev_calc < modified: - need_md5 = True - - if not need_md5: - continue - - # ------------------------------- - # Compute MD5 - # ------------------------------- - print(f"šŸ”„ Calculating MD5: {path}") - new_md5 = file_md5(path) - now = datetime.now().replace(microsecond=0) - - cursor.execute(""" - UPDATE files - SET content_md5=%s, - md5_calculated=%s - WHERE id=%s - """, (new_md5, now, file_id)) - - updates += 1 - - # optional commit per-file: - # conn.commit() - - conn.commit() - - print("\nāœ… MD5 calculation finished.") - print(f" Updated files: {updates}") - print(f" Skipped files: {total - updates}\n") - - except Error as e: - print("MySQL Error:", e) - - finally: - if cursor: - cursor.close() - if conn: - conn.close() - - -# ====================================================== -# RUN EXAMPLE -# ====================================================== -if __name__ == "__main__": - # Example usage: - run_md5_calculator( - device_name="Z230", - extension=".pdf", - max_size="100MB" - ) diff --git a/52 MD5CalculateMultiThread.py b/52 MD5CalculateMultiThread.py new file mode 100644 index 0000000..2a683a2 --- /dev/null +++ b/52 MD5CalculateMultiThread.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import hashlib +from datetime import datetime +import mysql.connector +from dotenv import load_dotenv +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + + +# ====================================================== +# Load environment +# ====================================================== +env_path = Path(__file__).resolve().parent / ".env" +load_dotenv(env_path) + + +# ====================================================== +# MySQL connection (each thread gets its own) +# ====================================================== +def get_db_connection(): + conn = mysql.connector.connect( + host=os.getenv("DB_MYSQL_HOST"), + user=os.getenv("DB_MYSQL_ROOT"), + password=os.getenv("DB_MYSQL_ROOT_PASS"), + port=int(os.getenv("DB_MYSQL_PORT")), + database="walkfiles", + auth_plugin="mysql_native_password" + ) + c = conn.cursor() + c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci") + c.close() + return conn + + +# ====================================================== +# Helpers +# ====================================================== +def file_md5(path, chunk_size=1024 * 1024): + md5 = hashlib.md5() + with open(path, "rb") as f: + while chunk := f.read(chunk_size): + md5.update(chunk) + return md5.hexdigest() + + +def parse_size(size_str: str) -> int: + s = size_str.strip().upper() + if s.endswith("KB"): + return int(float(s[:-2]) * 1024) + if s.endswith("MB"): + return int(float(s[:-2]) * 1024 * 1024) + if s.endswith("GB"): + return int(float(s[:-2]) * 1024 * 1024 * 1024) + return int(s) + + +# ====================================================== +# WORKER: Runs in thread +# ====================================================== +def process_one_file(row): + file_id = row["id"] + path = row["path"] + modified = row["modified"] + prev_md5 = row["content_md5"] + prev_calc = row["md5_calculated"] + + # --- Skip if file does not exist --- + if not os.path.isfile(path): + return (file_id, "missing", None) + + # --- Decide if MD5 needed --- + need_md5 = ( + prev_md5 is None or + prev_calc is None or + prev_calc < modified + ) + if not need_md5: + return (file_id, "skip", None) + + # --- Calculate MD5 --- + new_md5 = file_md5(path) + now = datetime.now().replace(microsecond=0) + + # --- Update DB inside thread --- + try: + conn = get_db_connection() + c = conn.cursor() + c.execute(""" + UPDATE files + SET content_md5=%s, + md5_calculated=%s + WHERE id=%s + """, (new_md5, now, file_id)) + conn.commit() + c.close() + conn.close() + return (file_id, "updated", new_md5) + + except Exception as e: + return (file_id, "error", str(e)) + + +# ====================================================== +# MAIN LOGIC (single-threaded DB query + multi-threaded MD5) +# ====================================================== +def run_md5_calculator(device_name=None, + device_id=None, + extension=".pdf", + max_size="50MB", + path_prefix=None, + threads=8): + + # ---------------------------- + # DEVICE filter resolution + # ---------------------------- + filter_by_device = True + if device_name == "ANY" or device_id == "ANY": + filter_by_device = False + + elif device_id is None: + if device_name is None: + raise RuntimeError("You must provide device_name or device_id") + + conn = get_db_connection() + cur = conn.cursor(dictionary=True) + cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,)) + row = cur.fetchone() + cur.close(); conn.close() + + if not row: + raise RuntimeError(f"Device '{device_name}' not found") + + device_id = row["id"] + + # EXTENSION filter + filter_by_extension = (extension != "ANY") + + # SIZE filter + filter_by_size = (max_size != "ANY") + max_bytes = parse_size(max_size) if filter_by_size else None + + # PATH filter + filter_by_path = (path_prefix not in [None, "", "ANY"]) + cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None + + print(f"\nšŸ” Filtering:" + f" device={'ANY' if not filter_by_device else device_id}," + f" ext={extension}," + f" max_size={max_size}," + f" prefix={path_prefix}\n") + + # --------------------------------------- + # Fetch all rows in a single DB query + # --------------------------------------- + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + + where = ["deleted = 0"] + params = [] + + if filter_by_device: + where.append("device_id=%s") + params.append(device_id) + + if filter_by_extension: + where.append("path LIKE %s") + params.append("%" + extension) + + if filter_by_size: + where.append("size <= %s") + params.append(max_bytes) + + if filter_by_path: + where.append("path LIKE %s") + params.append(cleaned_prefix + "%") + + sql = f""" + SELECT id, path, size, modified, content_md5, md5_calculated + FROM files + WHERE {" AND ".join(where)} + """ + + cursor.execute(sql, params) + rows = cursor.fetchall() + cursor.close(); conn.close() + + total = len(rows) + print(f"šŸ“ Files matching criteria: {total}\n") + + # ====================================================== + # === MULTITHREADED MD5 CALCULATION BELOW ============ + # ====================================================== + updated = 0 + skipped = 0 + missing = 0 + errors = 0 + + with ThreadPoolExecutor(max_workers=threads) as exe: + futures = {exe.submit(process_one_file, r): r["id"] for r in rows} + + for future in as_completed(futures): + file_id = futures[future] + status, result = None, None + + try: + file_id, status, result = future.result() + except Exception as e: + print(f"āŒ Thread error for ID {file_id}: {e}") + errors += 1 + continue + + if status == "updated": + updated += 1 + elif status == "skip": + skipped += 1 + elif status == "missing": + missing += 1 + elif status == "error": + errors += 1 + print(f"āš ļø DB update error: {result}") + + # ====================================================== + # SUMMARY + # ====================================================== + print("\n============================") + print("āœ… Multithreaded MD5 finished") + print("============================") + print(f"Updated: {updated}") + print(f"Skipped: {skipped}") + print(f"Missing: {missing}") + print(f"Errors: {errors}") + print(f"Threads: {threads}\n") + + + +# ====================================================== +# RUN EXAMPLE +# ====================================================== +if __name__ == "__main__": + run_md5_calculator( + device_name="TOWER", + extension="ANY", + max_size="ANY", + path_prefix="ANY", + threads=12 # ← ADJUST THREAD COUNT HERE + )