#!/opt/bin/python3 # -*- coding: utf-8 -*- import pymysql import hashlib import posixpath import unicodedata from binascii import hexlify # ============================================================ # CONFIG # ============================================================ DB_CONFIG = { "host": "192.168.1.50", "port": 3306, "user": "root", "password": "Vlado9674+", "database": "torrents", "charset": "utf8mb4", } HOST_FILTER = "tower" # None = all hosts LIMIT = None # e.g. 50000 for testing SHOW_EXAMPLES = 20 # ============================================================ # CANONICAL PATH # ============================================================ def canonical_path(path_str: str) -> str: if not path_str: return path_str path_str = path_str.replace("\\", "/") path_str = posixpath.normpath(path_str) path_str = unicodedata.normalize("NFC", path_str) return path_str def md5_bytes(path_str: str) -> bytes: return hashlib.md5(path_str.encode("utf-8")).digest() # ============================================================ # MAIN # ============================================================ def main(): db = pymysql.connect(**DB_CONFIG) cur = db.cursor(pymysql.cursors.SSCursor) sql = """ SELECT id, full_path, path_hash FROM file_md5_index """ params = [] if HOST_FILTER: sql += " WHERE host_name = %s" params.append(HOST_FILTER) if LIMIT: sql += " LIMIT %s" params.append(LIMIT) cur.execute(sql, params) total = 0 ok = 0 path_change = 0 hash_change = 0 examples_path = [] examples_hash = [] for rec_id, full_path, stored_hash in cur: total += 1 canonical = canonical_path(full_path) raw_hash = md5_bytes(full_path) canonical_hash = md5_bytes(canonical) # --------------------------------------------------- # CASE 1: fully OK # --------------------------------------------------- if full_path == canonical and stored_hash == canonical_hash: ok += 1 # --------------------------------------------------- # CASE 2: path string would change # --------------------------------------------------- if full_path != canonical: path_change += 1 if len(examples_path) < SHOW_EXAMPLES: examples_path.append((rec_id, full_path, canonical)) # --------------------------------------------------- # CASE 3: hash would change # --------------------------------------------------- if stored_hash != canonical_hash: hash_change += 1 if len(examples_hash) < SHOW_EXAMPLES: examples_hash.append( (rec_id, full_path, hexlify(stored_hash).decode(), hexlify(canonical_hash).decode()) ) if total % 100000 == 0: print(f"Checked {total:,} rows...") # ============================================================ # REPORT # ============================================================ print("\n" + "=" * 70) print("AUDIT SUMMARY") print("=" * 70) print(f"Total rows checked : {total:,}") print(f"OK (already canonical + hash OK) : {ok:,}") print(f"Paths that would change : {path_change:,}") print(f"Hashes that would change : {hash_change:,}") print("=" * 70) # ------------------------------------------------------------ # SHOW EXAMPLES # ------------------------------------------------------------ if examples_path: print("\n⚠ PATH CHANGE EXAMPLES:") for rec_id, old, new in examples_path: print(f"[id={rec_id}]") print(" DB :", old) print(" NEW:", new) print() if examples_hash: print("\n❌ HASH CHANGE EXAMPLES:") for rec_id, path, old_hash, new_hash in examples_hash: print(f"[id={rec_id}] {path}") print(" Stored :", old_hash) print(" New :", new_hash) print() cur.close() db.close() if __name__ == "__main__": main()