tw22
This commit is contained in:
@@ -8,7 +8,7 @@ import mysql.connector
|
||||
from dotenv import load_dotenv
|
||||
from pathlib import Path
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import threading
|
||||
|
||||
# ======================================================
|
||||
# Load environment
|
||||
@@ -16,6 +16,47 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
env_path = Path(__file__).resolve().parent / ".env"
|
||||
load_dotenv(env_path)
|
||||
|
||||
# ======================================================
|
||||
# LOGGING TOGGLE (ON/OFF)
|
||||
# ======================================================
|
||||
LOGGING_ENABLED = False # ← NEW: Set to False to silence all thread debug logs
|
||||
|
||||
# ======================================================
|
||||
# Colors & logging helpers
|
||||
# ======================================================
|
||||
RESET = "\033[0m"
|
||||
COLORS = [
|
||||
"\033[92m", # green
|
||||
"\033[94m", # blue
|
||||
"\033[93m", # yellow
|
||||
"\033[91m", # red
|
||||
"\033[95m", # magenta
|
||||
"\033[96m", # cyan
|
||||
"\033[90m", # gray
|
||||
]
|
||||
|
||||
print_lock = threading.Lock()
|
||||
|
||||
|
||||
def thread_color():
|
||||
name = threading.current_thread().name
|
||||
idx = 0
|
||||
if "_" in name:
|
||||
suffix = name.split("_")[-1]
|
||||
if suffix.isdigit():
|
||||
idx = int(suffix)
|
||||
return COLORS[idx % len(COLORS)]
|
||||
|
||||
|
||||
def log_thread(msg: str):
|
||||
"""Thread-safe, colored log with thread name prefix."""
|
||||
if not LOGGING_ENABLED: # ← NEW
|
||||
return
|
||||
with print_lock:
|
||||
name = threading.current_thread().name
|
||||
color = thread_color()
|
||||
print(f"{color}[{name}] {msg}{RESET}")
|
||||
|
||||
|
||||
# ======================================================
|
||||
# MySQL connection (each thread gets its own)
|
||||
@@ -41,7 +82,10 @@ def get_db_connection():
|
||||
def file_md5(path, chunk_size=1024 * 1024):
|
||||
md5 = hashlib.md5()
|
||||
with open(path, "rb") as f:
|
||||
while chunk := f.read(chunk_size):
|
||||
while True:
|
||||
chunk = f.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
md5.update(chunk)
|
||||
return md5.hexdigest()
|
||||
|
||||
@@ -64,24 +108,28 @@ def process_one_file(row):
|
||||
file_id = row["id"]
|
||||
path = row["path"]
|
||||
modified = row["modified"]
|
||||
prev_md5 = row["content_md5"]
|
||||
prev_calc = row["md5_calculated"]
|
||||
prev_md5 = row.get("content_md5")
|
||||
prev_calc = row.get("md5_calculated")
|
||||
|
||||
log_thread(f"START ID={file_id} → {path}")
|
||||
|
||||
# --- Skip if file does not exist ---
|
||||
if not os.path.isfile(path):
|
||||
return (file_id, "missing", None)
|
||||
log_thread(f"MISS ID={file_id} (file not found)")
|
||||
return file_id, "missing", None
|
||||
|
||||
# --- Decide if MD5 needed ---
|
||||
need_md5 = (
|
||||
prev_md5 is None or
|
||||
prev_calc is None or
|
||||
prev_calc < modified
|
||||
)
|
||||
if not need_md5:
|
||||
return (file_id, "skip", None)
|
||||
# --- Decide if MD5 calculation is needed ---
|
||||
if prev_md5 and prev_calc and prev_calc >= modified:
|
||||
log_thread(f"SKIP ID={file_id} (md5 up-to-date)")
|
||||
return file_id, "skip", None
|
||||
|
||||
# --- Calculate MD5 ---
|
||||
new_md5 = file_md5(path)
|
||||
try:
|
||||
new_md5 = file_md5(path)
|
||||
except Exception as e:
|
||||
log_thread(f"ERROR ID={file_id} while reading file: {e}")
|
||||
return file_id, "error", str(e)
|
||||
|
||||
now = datetime.now().replace(microsecond=0)
|
||||
|
||||
# --- Update DB inside thread ---
|
||||
@@ -97,14 +145,17 @@ def process_one_file(row):
|
||||
conn.commit()
|
||||
c.close()
|
||||
conn.close()
|
||||
return (file_id, "updated", new_md5)
|
||||
|
||||
log_thread(f"UPDATE ID={file_id} (MD5={new_md5})")
|
||||
return file_id, "updated", new_md5
|
||||
|
||||
except Exception as e:
|
||||
return (file_id, "error", str(e))
|
||||
log_thread(f"ERROR ID={file_id} DB update failed: {e}")
|
||||
return file_id, "error", str(e)
|
||||
|
||||
|
||||
# ======================================================
|
||||
# MAIN LOGIC (single-threaded DB query + multi-threaded MD5)
|
||||
# MAIN LOGIC
|
||||
# ======================================================
|
||||
def run_md5_calculator(device_name=None,
|
||||
device_id=None,
|
||||
@@ -113,9 +164,7 @@ def run_md5_calculator(device_name=None,
|
||||
path_prefix=None,
|
||||
threads=8):
|
||||
|
||||
# ----------------------------
|
||||
# DEVICE filter resolution
|
||||
# ----------------------------
|
||||
# DEVICE resolution
|
||||
filter_by_device = True
|
||||
if device_name == "ANY" or device_id == "ANY":
|
||||
filter_by_device = False
|
||||
@@ -128,21 +177,17 @@ def run_md5_calculator(device_name=None,
|
||||
cur = conn.cursor(dictionary=True)
|
||||
cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,))
|
||||
row = cur.fetchone()
|
||||
cur.close(); conn.close()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
if not row:
|
||||
raise RuntimeError(f"Device '{device_name}' not found")
|
||||
|
||||
device_id = row["id"]
|
||||
|
||||
# EXTENSION filter
|
||||
filter_by_extension = (extension != "ANY")
|
||||
|
||||
# SIZE filter
|
||||
filter_by_size = (max_size != "ANY")
|
||||
max_bytes = parse_size(max_size) if filter_by_size else None
|
||||
|
||||
# PATH filter
|
||||
filter_by_path = (path_prefix not in [None, "", "ANY"])
|
||||
cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None
|
||||
|
||||
@@ -152,9 +197,7 @@ def run_md5_calculator(device_name=None,
|
||||
f" max_size={max_size},"
|
||||
f" prefix={path_prefix}\n")
|
||||
|
||||
# ---------------------------------------
|
||||
# Fetch all rows in a single DB query
|
||||
# ---------------------------------------
|
||||
# Fetch rows
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
@@ -181,34 +224,38 @@ def run_md5_calculator(device_name=None,
|
||||
SELECT id, path, size, modified, content_md5, md5_calculated
|
||||
FROM files
|
||||
WHERE {" AND ".join(where)}
|
||||
AND NOT (
|
||||
content_md5 IS NOT NULL
|
||||
AND md5_calculated IS NOT NULL
|
||||
AND md5_calculated >= modified
|
||||
)
|
||||
"""
|
||||
|
||||
cursor.execute(sql, params)
|
||||
rows = cursor.fetchall()
|
||||
cursor.close(); conn.close()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
total = len(rows)
|
||||
print(f"📁 Files matching criteria: {total}\n")
|
||||
|
||||
# ======================================================
|
||||
# === MULTITHREADED MD5 CALCULATION BELOW ============
|
||||
# ======================================================
|
||||
updated = 0
|
||||
skipped = 0
|
||||
missing = 0
|
||||
errors = 0
|
||||
if total == 0:
|
||||
print("Nothing to do, exiting.")
|
||||
return
|
||||
|
||||
with ThreadPoolExecutor(max_workers=threads) as exe:
|
||||
# MULTITHREADED MD5
|
||||
updated = skipped = missing = errors = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Worker") as exe:
|
||||
futures = {exe.submit(process_one_file, r): r["id"] for r in rows}
|
||||
|
||||
for future in as_completed(futures):
|
||||
for i, future in enumerate(as_completed(futures), start=1):
|
||||
file_id = futures[future]
|
||||
status, result = None, None
|
||||
|
||||
try:
|
||||
file_id, status, result = future.result()
|
||||
_id, status, result = future.result()
|
||||
except Exception as e:
|
||||
print(f"❌ Thread error for ID {file_id}: {e}")
|
||||
log_thread(f"FUTURE ERROR for ID={file_id}: {e}")
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
@@ -220,11 +267,11 @@ def run_md5_calculator(device_name=None,
|
||||
missing += 1
|
||||
elif status == "error":
|
||||
errors += 1
|
||||
print(f"⚠️ DB update error: {result}")
|
||||
|
||||
# ======================================================
|
||||
if i % 100 == 0:
|
||||
print(f"… processed {i}/{total} files")
|
||||
|
||||
# SUMMARY
|
||||
# ======================================================
|
||||
print("\n============================")
|
||||
print("✅ Multithreaded MD5 finished")
|
||||
print("============================")
|
||||
@@ -235,7 +282,6 @@ def run_md5_calculator(device_name=None,
|
||||
print(f"Threads: {threads}\n")
|
||||
|
||||
|
||||
|
||||
# ======================================================
|
||||
# RUN EXAMPLE
|
||||
# ======================================================
|
||||
@@ -245,5 +291,5 @@ if __name__ == "__main__":
|
||||
extension="ANY",
|
||||
max_size="ANY",
|
||||
path_prefix="ANY",
|
||||
threads=12 # ← ADJUST THREAD COUNT HERE
|
||||
threads=6
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user