Compare commits
2 Commits
main
...
b61a8a5473
| Author | SHA1 | Date | |
|---|---|---|---|
| b61a8a5473 | |||
| 83f2d0dafc |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
.venv/
|
||||||
|
.idea/
|
||||||
@@ -258,8 +258,7 @@ def walk_and_store_bulk():
|
|||||||
BATCH_SIZE = 10000
|
BATCH_SIZE = 10000
|
||||||
# target_dir = r"\\tower1\#colddata"
|
# target_dir = r"\\tower1\#colddata"
|
||||||
# target_dir = r"z:"
|
# target_dir = r"z:"
|
||||||
# target_dir = r"\\tower\ebooks"
|
target_dir = r"\\tower\ebooks"
|
||||||
target_dir = r"\\tower\dedup"
|
|
||||||
# device_name = "TW22"
|
# device_name = "TW22"
|
||||||
device_name = "TOWER"
|
device_name = "TOWER"
|
||||||
|
|
||||||
|
|||||||
@@ -1,485 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import os
|
|
||||||
import hashlib
|
|
||||||
from datetime import datetime
|
|
||||||
import mysql.connector
|
|
||||||
from mysql.connector import Error
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from pathlib import Path
|
|
||||||
import unicodedata
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# Load .env from the script directory
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
env_path = Path(__file__).resolve().parent / ".env"
|
|
||||||
load_dotenv(env_path)
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# Helper: MD5 of full file path string
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def md5_path(path: str) -> str:
|
|
||||||
return hashlib.md5(path.encode("utf8")).hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# MySQL CONNECTIONS
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def get_server_connection():
|
|
||||||
return mysql.connector.connect(
|
|
||||||
host=os.getenv("DB_MYSQL_HOST"),
|
|
||||||
user=os.getenv("DB_MYSQL_ROOT"),
|
|
||||||
password=os.getenv("DB_MYSQL_ROOT_PASS"),
|
|
||||||
port=int(os.getenv("DB_MYSQL_PORT")),
|
|
||||||
auth_plugin="mysql_native_password",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_db_connection():
|
|
||||||
conn = mysql.connector.connect(
|
|
||||||
host=os.getenv("DB_MYSQL_HOST"),
|
|
||||||
user=os.getenv("DB_MYSQL_ROOT"),
|
|
||||||
password=os.getenv("DB_MYSQL_ROOT_PASS"),
|
|
||||||
port=int(os.getenv("DB_MYSQL_PORT")),
|
|
||||||
database="walkfiles",
|
|
||||||
auth_plugin="mysql_native_password",
|
|
||||||
)
|
|
||||||
|
|
||||||
c = conn.cursor()
|
|
||||||
c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci")
|
|
||||||
c.close()
|
|
||||||
return conn
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# DATABASE INITIALIZATION
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def init_db():
|
|
||||||
# Ensure DB exists
|
|
||||||
server = get_server_connection()
|
|
||||||
cur = server.cursor()
|
|
||||||
cur.execute("""
|
|
||||||
CREATE DATABASE IF NOT EXISTS walkfiles
|
|
||||||
DEFAULT CHARACTER SET utf8mb4
|
|
||||||
COLLATE utf8mb4_general_ci
|
|
||||||
""")
|
|
||||||
server.commit()
|
|
||||||
cur.close()
|
|
||||||
server.close()
|
|
||||||
|
|
||||||
# Connect
|
|
||||||
conn = get_db_connection()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# DEVICES
|
|
||||||
cursor.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS devices (
|
|
||||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
||||||
name VARCHAR(255) UNIQUE,
|
|
||||||
scanned_at DATETIME NULL
|
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
|
||||||
""")
|
|
||||||
|
|
||||||
# FOLDERS
|
|
||||||
cursor.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS folders (
|
|
||||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
||||||
path VARCHAR(2048) NOT NULL,
|
|
||||||
parent_id INT NULL,
|
|
||||||
device_id INT NOT NULL,
|
|
||||||
first_seen DATETIME NOT NULL,
|
|
||||||
last_seen DATETIME NOT NULL,
|
|
||||||
deleted TINYINT(1) NOT NULL DEFAULT 0,
|
|
||||||
|
|
||||||
CONSTRAINT fk_folder_device
|
|
||||||
FOREIGN KEY (device_id) REFERENCES devices(id)
|
|
||||||
ON DELETE CASCADE,
|
|
||||||
|
|
||||||
UNIQUE KEY uniq_folder_path (device_id, path(255)),
|
|
||||||
INDEX idx_folder_dev (device_id)
|
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
|
||||||
""")
|
|
||||||
|
|
||||||
# FILES
|
|
||||||
cursor.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS files (
|
|
||||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
||||||
|
|
||||||
name VARCHAR(255) NOT NULL,
|
|
||||||
path VARCHAR(2048) NOT NULL,
|
|
||||||
path_md5 CHAR(32) NOT NULL,
|
|
||||||
|
|
||||||
size BIGINT NULL,
|
|
||||||
modified DATETIME NULL,
|
|
||||||
type VARCHAR(255) NULL,
|
|
||||||
|
|
||||||
folder_id INT NULL,
|
|
||||||
device_id INT NOT NULL,
|
|
||||||
deleted TINYINT(1) NOT NULL DEFAULT 0,
|
|
||||||
|
|
||||||
first_seen DATETIME NOT NULL,
|
|
||||||
last_seen DATETIME NOT NULL,
|
|
||||||
|
|
||||||
CONSTRAINT fk_file_folder
|
|
||||||
FOREIGN KEY (folder_id) REFERENCES folders(id)
|
|
||||||
ON DELETE SET NULL,
|
|
||||||
|
|
||||||
CONSTRAINT fk_file_device
|
|
||||||
FOREIGN KEY (device_id) REFERENCES devices(id)
|
|
||||||
ON DELETE CASCADE,
|
|
||||||
|
|
||||||
UNIQUE KEY uniq_file_path_md5 (device_id, path_md5),
|
|
||||||
INDEX idx_file_folder (folder_id),
|
|
||||||
INDEX idx_file_deleted (device_id, deleted)
|
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
|
||||||
""")
|
|
||||||
|
|
||||||
conn.commit()
|
|
||||||
return conn, cursor
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# HELPERS — DEVICES & FOLDERS
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def get_or_create_device(cursor, conn, name: str) -> int:
|
|
||||||
now = datetime.now()
|
|
||||||
cursor.execute("INSERT IGNORE INTO devices (name, scanned_at) VALUES (%s,%s)", (name, now))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
cursor.execute("SELECT id FROM devices WHERE name=%s", (name,))
|
|
||||||
return cursor.fetchone()[0]
|
|
||||||
|
|
||||||
|
|
||||||
def load_folder_state(cursor, device_id: int):
|
|
||||||
"""
|
|
||||||
Načte všechny složky pro zařízení a uloží jako:
|
|
||||||
folder_state[normalized_path] = {"id": id, "deleted": 0/1}
|
|
||||||
"""
|
|
||||||
cursor.execute("""
|
|
||||||
SELECT id, path, deleted
|
|
||||||
FROM folders
|
|
||||||
WHERE device_id=%s
|
|
||||||
""", (device_id,))
|
|
||||||
|
|
||||||
out = {}
|
|
||||||
for folder_id, path, deleted in cursor.fetchall():
|
|
||||||
norm_path = os.path.normpath(path)
|
|
||||||
out[norm_path] = {"id": folder_id, "deleted": int(deleted)}
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
def get_or_create_folder(cursor, conn, folder_state, device_id, folder_path, parent_id):
|
|
||||||
"""
|
|
||||||
Vytvoří nebo najde složku. Ošetřuje:
|
|
||||||
- Unicode normalizaci (Černý vs Černý)
|
|
||||||
- cache v paměti (folder_state)
|
|
||||||
- idempotentní INSERT (ON DUPLICATE KEY UPDATE)
|
|
||||||
"""
|
|
||||||
# Normalize Unicode + path form
|
|
||||||
folder_path = unicodedata.normalize("NFC", folder_path)
|
|
||||||
folder_path = os.path.normpath(folder_path)
|
|
||||||
|
|
||||||
key = folder_path
|
|
||||||
|
|
||||||
# 1) Cache hit
|
|
||||||
if key in folder_state:
|
|
||||||
return folder_state[key]["id"]
|
|
||||||
|
|
||||||
now = datetime.now()
|
|
||||||
|
|
||||||
# 2) Zkus SELECT
|
|
||||||
cursor.execute("""
|
|
||||||
SELECT id
|
|
||||||
FROM folders
|
|
||||||
WHERE device_id = %s AND path = %s
|
|
||||||
LIMIT 1
|
|
||||||
""", (device_id, folder_path))
|
|
||||||
row = cursor.fetchone()
|
|
||||||
|
|
||||||
if row:
|
|
||||||
folder_id = row[0]
|
|
||||||
folder_state[key] = {"id": folder_id, "deleted": 0}
|
|
||||||
return folder_id
|
|
||||||
|
|
||||||
# 3) INSERT (idempotent)
|
|
||||||
cursor.execute("""
|
|
||||||
INSERT INTO folders (path, parent_id, device_id, first_seen, last_seen)
|
|
||||||
VALUES (%s, %s, %s, %s, %s)
|
|
||||||
ON DUPLICATE KEY UPDATE
|
|
||||||
id = LAST_INSERT_ID(id),
|
|
||||||
last_seen = VALUES(last_seen)
|
|
||||||
""", (folder_path, parent_id, device_id, now, now))
|
|
||||||
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
folder_id = cursor.lastrowid
|
|
||||||
folder_state[key] = {"id": folder_id, "deleted": 0}
|
|
||||||
return folder_id
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# LOAD LAST FILE STATE
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def load_last_file_state(cursor, device_id: int):
|
|
||||||
"""
|
|
||||||
Načte poslední známý stav souborů pro zařízení, indexovaný podle path_md5.
|
|
||||||
(Z historických důvodů přes MAX(id), i když máš UNIQUE na (device_id, path_md5))
|
|
||||||
"""
|
|
||||||
cursor.execute("""
|
|
||||||
SELECT f.id, f.path_md5, f.deleted, f.size, f.modified
|
|
||||||
FROM files f
|
|
||||||
JOIN (
|
|
||||||
SELECT MAX(id) AS mx
|
|
||||||
FROM files
|
|
||||||
WHERE device_id=%s
|
|
||||||
GROUP BY path_md5
|
|
||||||
) t ON f.id = t.mx
|
|
||||||
""", (device_id,))
|
|
||||||
|
|
||||||
out = {}
|
|
||||||
for fid, md5, deleted, size, modified in cursor.fetchall():
|
|
||||||
out[md5] = {
|
|
||||||
"id": fid,
|
|
||||||
"deleted": int(deleted),
|
|
||||||
"size": size,
|
|
||||||
"modified": modified,
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# MAIN SCANNER WITH BATCHING
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
def walk_and_store_bulk():
|
|
||||||
|
|
||||||
BATCH_SIZE = 10000
|
|
||||||
# target_dir = r"\\tower1\#colddata"
|
|
||||||
# target_dir = r"z:"
|
|
||||||
target_dir = r"\\tower\ebooks"
|
|
||||||
# target_dir = r"\\tower\dedup"
|
|
||||||
device_name = "TOWER"
|
|
||||||
|
|
||||||
# Normalizovaný root pro porovnávání a LIKE
|
|
||||||
target_dir_norm = os.path.normpath(target_dir)
|
|
||||||
|
|
||||||
if not os.path.isdir(target_dir):
|
|
||||||
print("Invalid directory:", target_dir)
|
|
||||||
return
|
|
||||||
|
|
||||||
conn, cursor = init_db()
|
|
||||||
now = datetime.now()
|
|
||||||
|
|
||||||
device_id = get_or_create_device(cursor, conn, device_name)
|
|
||||||
folder_state = load_folder_state(cursor, device_id)
|
|
||||||
file_state = load_last_file_state(cursor, device_id)
|
|
||||||
|
|
||||||
seen_folders = set()
|
|
||||||
seen_files = set() # MD5 of path
|
|
||||||
|
|
||||||
files_to_insert = []
|
|
||||||
files_to_update = []
|
|
||||||
|
|
||||||
total_files = 0
|
|
||||||
|
|
||||||
print(f"🔍 Scanning: {target_dir} (device {device_id})")
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# WALK FILESYSTEM
|
|
||||||
# -------------------------------------------------
|
|
||||||
for root, dirs, files in os.walk(target_dir):
|
|
||||||
folder_path = os.path.normpath(root)
|
|
||||||
|
|
||||||
# 1️⃣ determine parent_id correctly
|
|
||||||
if folder_path == target_dir_norm:
|
|
||||||
parent_id = None
|
|
||||||
else:
|
|
||||||
parent_folder_path = os.path.normpath(os.path.dirname(folder_path))
|
|
||||||
parent_id = get_or_create_folder(cursor, conn, folder_state,
|
|
||||||
device_id, parent_folder_path,
|
|
||||||
None)
|
|
||||||
|
|
||||||
# 2️⃣ now insert current folder with correct parent_id
|
|
||||||
seen_folders.add(folder_path)
|
|
||||||
folder_id = get_or_create_folder(cursor, conn, folder_state,
|
|
||||||
device_id, folder_path,
|
|
||||||
parent_id)
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# FILE LOOP
|
|
||||||
# -------------------------------------------------
|
|
||||||
for name in files:
|
|
||||||
total_files += 1
|
|
||||||
|
|
||||||
filepath = os.path.normpath(os.path.join(folder_path, name))
|
|
||||||
md5 = md5_path(filepath)
|
|
||||||
seen_files.add(md5)
|
|
||||||
|
|
||||||
try:
|
|
||||||
st = os.stat(filepath)
|
|
||||||
except FileNotFoundError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
modified = datetime.fromtimestamp(st.st_mtime).replace(microsecond=0)
|
|
||||||
size = st.st_size
|
|
||||||
ext = os.path.splitext(name)[1][:250]
|
|
||||||
|
|
||||||
prev = file_state.get(md5)
|
|
||||||
|
|
||||||
if prev is None:
|
|
||||||
# nový soubor
|
|
||||||
files_to_insert.append(
|
|
||||||
(name, filepath, md5, size, modified, ext,
|
|
||||||
folder_id, device_id, 0, now, now)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if prev["deleted"] == 1:
|
|
||||||
# "vzkříšený" soubor
|
|
||||||
files_to_insert.append(
|
|
||||||
(name, filepath, md5, size, modified, ext,
|
|
||||||
folder_id, device_id, 0, now, now)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# existuje a není deleted → zkontroluj změnu velikosti / času
|
|
||||||
if prev["size"] != size or prev["modified"] != modified:
|
|
||||||
files_to_update.append(
|
|
||||||
(size, modified, now, prev["id"])
|
|
||||||
)
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# BATCH FLUSHING
|
|
||||||
# -------------------------------------------------
|
|
||||||
if len(files_to_insert) >= BATCH_SIZE:
|
|
||||||
print(f"💾 Flushing {len(files_to_insert)} inserts...")
|
|
||||||
cursor.executemany("""
|
|
||||||
INSERT INTO files (
|
|
||||||
name, path, path_md5, size, modified, type,
|
|
||||||
folder_id, device_id, deleted,
|
|
||||||
first_seen, last_seen
|
|
||||||
)
|
|
||||||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
||||||
""", files_to_insert)
|
|
||||||
conn.commit()
|
|
||||||
files_to_insert.clear()
|
|
||||||
|
|
||||||
if len(files_to_update) >= BATCH_SIZE:
|
|
||||||
print(f"💾 Flushing {len(files_to_update)} updates...")
|
|
||||||
cursor.executemany("""
|
|
||||||
UPDATE files
|
|
||||||
SET size=%s, modified=%s, last_seen=%s, deleted=0
|
|
||||||
WHERE id=%s
|
|
||||||
""", files_to_update)
|
|
||||||
conn.commit()
|
|
||||||
files_to_update.clear()
|
|
||||||
|
|
||||||
# PROGRESS
|
|
||||||
if total_files % 1000 == 0:
|
|
||||||
print(f" ... processed {total_files} files")
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# FINAL FLUSH (REMAINING INSERTS/UPDATES)
|
|
||||||
# -------------------------------------------------
|
|
||||||
|
|
||||||
if files_to_insert:
|
|
||||||
print(f"💾 Final flush: {len(files_to_insert)} inserts")
|
|
||||||
cursor.executemany("""
|
|
||||||
INSERT INTO files (
|
|
||||||
name, path, path_md5, size, modified, type,
|
|
||||||
folder_id, device_id, deleted,
|
|
||||||
first_seen, last_seen
|
|
||||||
)
|
|
||||||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
||||||
""", files_to_insert)
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
if files_to_update:
|
|
||||||
print(f"💾 Final flush: {len(files_to_update)} updates")
|
|
||||||
cursor.executemany("""
|
|
||||||
UPDATE files
|
|
||||||
SET size=%s, modified=%s, last_seen=%s, deleted=0
|
|
||||||
WHERE id=%s
|
|
||||||
""", files_to_update)
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# MARK DELETED FILES — ONLY IN THIS SUBTREE
|
|
||||||
# -------------------------------------------------
|
|
||||||
files_deleted_count = 0
|
|
||||||
|
|
||||||
like_prefix = target_dir_norm.rstrip("\\/") + "%"
|
|
||||||
|
|
||||||
cursor.execute("""
|
|
||||||
SELECT id, path_md5
|
|
||||||
FROM files
|
|
||||||
WHERE device_id = %s
|
|
||||||
AND deleted = 0
|
|
||||||
AND path LIKE %s
|
|
||||||
""", (device_id, like_prefix))
|
|
||||||
|
|
||||||
candidates = cursor.fetchall()
|
|
||||||
ids_to_delete = [fid for (fid, md5) in candidates if md5 not in seen_files]
|
|
||||||
|
|
||||||
if ids_to_delete:
|
|
||||||
print(f"💾 Marking {len(ids_to_delete)} files as deleted in subtree")
|
|
||||||
cursor.executemany("""
|
|
||||||
UPDATE files
|
|
||||||
SET deleted=1, last_seen=%s
|
|
||||||
WHERE id=%s
|
|
||||||
""", [(now, fid) for fid in ids_to_delete])
|
|
||||||
conn.commit()
|
|
||||||
files_deleted_count = len(ids_to_delete)
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# MARK DELETED FOLDERS — ONLY IN THIS SUBTREE
|
|
||||||
# -------------------------------------------------
|
|
||||||
folders_to_mark_deleted = []
|
|
||||||
for path, info in folder_state.items():
|
|
||||||
# omez na subtree (včetně root složky)
|
|
||||||
norm_path = os.path.normpath(path)
|
|
||||||
if not norm_path.startswith(target_dir_norm):
|
|
||||||
continue
|
|
||||||
if info["deleted"] == 0 and norm_path not in seen_folders:
|
|
||||||
folders_to_mark_deleted.append((now, info["id"]))
|
|
||||||
|
|
||||||
folders_deleted_count = 0
|
|
||||||
if folders_to_mark_deleted:
|
|
||||||
cursor.executemany("""
|
|
||||||
UPDATE folders
|
|
||||||
SET deleted=1, last_seen=%s
|
|
||||||
WHERE id=%s
|
|
||||||
""", folders_to_mark_deleted)
|
|
||||||
conn.commit()
|
|
||||||
folders_deleted_count = len(folders_to_mark_deleted)
|
|
||||||
|
|
||||||
# -------------------------------------------------
|
|
||||||
# Update device timestamp
|
|
||||||
# -------------------------------------------------
|
|
||||||
cursor.execute("UPDATE devices SET scanned_at=%s WHERE id=%s", (now, device_id))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
cursor.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
print("")
|
|
||||||
print("✅ Scan completed.")
|
|
||||||
print(" Total files scanned:", total_files)
|
|
||||||
print(" Files inserted:", len(files_to_insert)) # po flushi je 0, ale nechávám pro konzistenci
|
|
||||||
print(" Files updated:", len(files_to_update)) # dtto
|
|
||||||
print(" Files deleted in subtree:", files_deleted_count)
|
|
||||||
print(" Folders deleted in subtree:", folders_deleted_count)
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# MAIN ENTRY
|
|
||||||
# ======================================================
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
walk_and_store_bulk()
|
|
||||||
@@ -8,7 +8,7 @@ import mysql.connector
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
import threading
|
|
||||||
|
|
||||||
# ======================================================
|
# ======================================================
|
||||||
# Load environment
|
# Load environment
|
||||||
@@ -16,47 +16,6 @@ import threading
|
|||||||
env_path = Path(__file__).resolve().parent / ".env"
|
env_path = Path(__file__).resolve().parent / ".env"
|
||||||
load_dotenv(env_path)
|
load_dotenv(env_path)
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# LOGGING TOGGLE (ON/OFF)
|
|
||||||
# ======================================================
|
|
||||||
LOGGING_ENABLED = False # ← NEW: Set to False to silence all thread debug logs
|
|
||||||
|
|
||||||
# ======================================================
|
|
||||||
# Colors & logging helpers
|
|
||||||
# ======================================================
|
|
||||||
RESET = "\033[0m"
|
|
||||||
COLORS = [
|
|
||||||
"\033[92m", # green
|
|
||||||
"\033[94m", # blue
|
|
||||||
"\033[93m", # yellow
|
|
||||||
"\033[91m", # red
|
|
||||||
"\033[95m", # magenta
|
|
||||||
"\033[96m", # cyan
|
|
||||||
"\033[90m", # gray
|
|
||||||
]
|
|
||||||
|
|
||||||
print_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def thread_color():
|
|
||||||
name = threading.current_thread().name
|
|
||||||
idx = 0
|
|
||||||
if "_" in name:
|
|
||||||
suffix = name.split("_")[-1]
|
|
||||||
if suffix.isdigit():
|
|
||||||
idx = int(suffix)
|
|
||||||
return COLORS[idx % len(COLORS)]
|
|
||||||
|
|
||||||
|
|
||||||
def log_thread(msg: str):
|
|
||||||
"""Thread-safe, colored log with thread name prefix."""
|
|
||||||
if not LOGGING_ENABLED: # ← NEW
|
|
||||||
return
|
|
||||||
with print_lock:
|
|
||||||
name = threading.current_thread().name
|
|
||||||
color = thread_color()
|
|
||||||
print(f"{color}[{name}] {msg}{RESET}")
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
# ======================================================
|
||||||
# MySQL connection (each thread gets its own)
|
# MySQL connection (each thread gets its own)
|
||||||
@@ -82,10 +41,7 @@ def get_db_connection():
|
|||||||
def file_md5(path, chunk_size=1024 * 1024):
|
def file_md5(path, chunk_size=1024 * 1024):
|
||||||
md5 = hashlib.md5()
|
md5 = hashlib.md5()
|
||||||
with open(path, "rb") as f:
|
with open(path, "rb") as f:
|
||||||
while True:
|
while chunk := f.read(chunk_size):
|
||||||
chunk = f.read(chunk_size)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
md5.update(chunk)
|
md5.update(chunk)
|
||||||
return md5.hexdigest()
|
return md5.hexdigest()
|
||||||
|
|
||||||
@@ -108,28 +64,24 @@ def process_one_file(row):
|
|||||||
file_id = row["id"]
|
file_id = row["id"]
|
||||||
path = row["path"]
|
path = row["path"]
|
||||||
modified = row["modified"]
|
modified = row["modified"]
|
||||||
prev_md5 = row.get("content_md5")
|
prev_md5 = row["content_md5"]
|
||||||
prev_calc = row.get("md5_calculated")
|
prev_calc = row["md5_calculated"]
|
||||||
|
|
||||||
log_thread(f"START ID={file_id} → {path}")
|
|
||||||
|
|
||||||
# --- Skip if file does not exist ---
|
# --- Skip if file does not exist ---
|
||||||
if not os.path.isfile(path):
|
if not os.path.isfile(path):
|
||||||
log_thread(f"MISS ID={file_id} (file not found)")
|
return (file_id, "missing", None)
|
||||||
return file_id, "missing", None
|
|
||||||
|
|
||||||
# --- Decide if MD5 calculation is needed ---
|
# --- Decide if MD5 needed ---
|
||||||
if prev_md5 and prev_calc and prev_calc >= modified:
|
need_md5 = (
|
||||||
log_thread(f"SKIP ID={file_id} (md5 up-to-date)")
|
prev_md5 is None or
|
||||||
return file_id, "skip", None
|
prev_calc is None or
|
||||||
|
prev_calc < modified
|
||||||
|
)
|
||||||
|
if not need_md5:
|
||||||
|
return (file_id, "skip", None)
|
||||||
|
|
||||||
# --- Calculate MD5 ---
|
# --- Calculate MD5 ---
|
||||||
try:
|
|
||||||
new_md5 = file_md5(path)
|
new_md5 = file_md5(path)
|
||||||
except Exception as e:
|
|
||||||
log_thread(f"ERROR ID={file_id} while reading file: {e}")
|
|
||||||
return file_id, "error", str(e)
|
|
||||||
|
|
||||||
now = datetime.now().replace(microsecond=0)
|
now = datetime.now().replace(microsecond=0)
|
||||||
|
|
||||||
# --- Update DB inside thread ---
|
# --- Update DB inside thread ---
|
||||||
@@ -145,17 +97,14 @@ def process_one_file(row):
|
|||||||
conn.commit()
|
conn.commit()
|
||||||
c.close()
|
c.close()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
return (file_id, "updated", new_md5)
|
||||||
log_thread(f"UPDATE ID={file_id} (MD5={new_md5})")
|
|
||||||
return file_id, "updated", new_md5
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log_thread(f"ERROR ID={file_id} DB update failed: {e}")
|
return (file_id, "error", str(e))
|
||||||
return file_id, "error", str(e)
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================
|
# ======================================================
|
||||||
# MAIN LOGIC
|
# MAIN LOGIC (single-threaded DB query + multi-threaded MD5)
|
||||||
# ======================================================
|
# ======================================================
|
||||||
def run_md5_calculator(device_name=None,
|
def run_md5_calculator(device_name=None,
|
||||||
device_id=None,
|
device_id=None,
|
||||||
@@ -164,7 +113,9 @@ def run_md5_calculator(device_name=None,
|
|||||||
path_prefix=None,
|
path_prefix=None,
|
||||||
threads=8):
|
threads=8):
|
||||||
|
|
||||||
# DEVICE resolution
|
# ----------------------------
|
||||||
|
# DEVICE filter resolution
|
||||||
|
# ----------------------------
|
||||||
filter_by_device = True
|
filter_by_device = True
|
||||||
if device_name == "ANY" or device_id == "ANY":
|
if device_name == "ANY" or device_id == "ANY":
|
||||||
filter_by_device = False
|
filter_by_device = False
|
||||||
@@ -177,17 +128,21 @@ def run_md5_calculator(device_name=None,
|
|||||||
cur = conn.cursor(dictionary=True)
|
cur = conn.cursor(dictionary=True)
|
||||||
cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,))
|
cur.execute("SELECT id FROM devices WHERE name=%s", (device_name,))
|
||||||
row = cur.fetchone()
|
row = cur.fetchone()
|
||||||
cur.close()
|
cur.close(); conn.close()
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if not row:
|
if not row:
|
||||||
raise RuntimeError(f"Device '{device_name}' not found")
|
raise RuntimeError(f"Device '{device_name}' not found")
|
||||||
|
|
||||||
device_id = row["id"]
|
device_id = row["id"]
|
||||||
|
|
||||||
|
# EXTENSION filter
|
||||||
filter_by_extension = (extension != "ANY")
|
filter_by_extension = (extension != "ANY")
|
||||||
|
|
||||||
|
# SIZE filter
|
||||||
filter_by_size = (max_size != "ANY")
|
filter_by_size = (max_size != "ANY")
|
||||||
max_bytes = parse_size(max_size) if filter_by_size else None
|
max_bytes = parse_size(max_size) if filter_by_size else None
|
||||||
|
|
||||||
|
# PATH filter
|
||||||
filter_by_path = (path_prefix not in [None, "", "ANY"])
|
filter_by_path = (path_prefix not in [None, "", "ANY"])
|
||||||
cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None
|
cleaned_prefix = path_prefix.rstrip("\\/") if filter_by_path else None
|
||||||
|
|
||||||
@@ -197,7 +152,9 @@ def run_md5_calculator(device_name=None,
|
|||||||
f" max_size={max_size},"
|
f" max_size={max_size},"
|
||||||
f" prefix={path_prefix}\n")
|
f" prefix={path_prefix}\n")
|
||||||
|
|
||||||
# Fetch rows
|
# ---------------------------------------
|
||||||
|
# Fetch all rows in a single DB query
|
||||||
|
# ---------------------------------------
|
||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor(dictionary=True)
|
cursor = conn.cursor(dictionary=True)
|
||||||
|
|
||||||
@@ -224,38 +181,34 @@ def run_md5_calculator(device_name=None,
|
|||||||
SELECT id, path, size, modified, content_md5, md5_calculated
|
SELECT id, path, size, modified, content_md5, md5_calculated
|
||||||
FROM files
|
FROM files
|
||||||
WHERE {" AND ".join(where)}
|
WHERE {" AND ".join(where)}
|
||||||
AND NOT (
|
|
||||||
content_md5 IS NOT NULL
|
|
||||||
AND md5_calculated IS NOT NULL
|
|
||||||
AND md5_calculated >= modified
|
|
||||||
)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
cursor.execute(sql, params)
|
cursor.execute(sql, params)
|
||||||
rows = cursor.fetchall()
|
rows = cursor.fetchall()
|
||||||
cursor.close()
|
cursor.close(); conn.close()
|
||||||
conn.close()
|
|
||||||
|
|
||||||
total = len(rows)
|
total = len(rows)
|
||||||
print(f"📁 Files matching criteria: {total}\n")
|
print(f"📁 Files matching criteria: {total}\n")
|
||||||
|
|
||||||
if total == 0:
|
# ======================================================
|
||||||
print("Nothing to do, exiting.")
|
# === MULTITHREADED MD5 CALCULATION BELOW ============
|
||||||
return
|
# ======================================================
|
||||||
|
updated = 0
|
||||||
|
skipped = 0
|
||||||
|
missing = 0
|
||||||
|
errors = 0
|
||||||
|
|
||||||
# MULTITHREADED MD5
|
with ThreadPoolExecutor(max_workers=threads) as exe:
|
||||||
updated = skipped = missing = errors = 0
|
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Worker") as exe:
|
|
||||||
futures = {exe.submit(process_one_file, r): r["id"] for r in rows}
|
futures = {exe.submit(process_one_file, r): r["id"] for r in rows}
|
||||||
|
|
||||||
for i, future in enumerate(as_completed(futures), start=1):
|
for future in as_completed(futures):
|
||||||
file_id = futures[future]
|
file_id = futures[future]
|
||||||
|
status, result = None, None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_id, status, result = future.result()
|
file_id, status, result = future.result()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log_thread(f"FUTURE ERROR for ID={file_id}: {e}")
|
print(f"❌ Thread error for ID {file_id}: {e}")
|
||||||
errors += 1
|
errors += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -267,11 +220,11 @@ def run_md5_calculator(device_name=None,
|
|||||||
missing += 1
|
missing += 1
|
||||||
elif status == "error":
|
elif status == "error":
|
||||||
errors += 1
|
errors += 1
|
||||||
|
print(f"⚠️ DB update error: {result}")
|
||||||
|
|
||||||
if i % 100 == 0:
|
# ======================================================
|
||||||
print(f"… processed {i}/{total} files")
|
|
||||||
|
|
||||||
# SUMMARY
|
# SUMMARY
|
||||||
|
# ======================================================
|
||||||
print("\n============================")
|
print("\n============================")
|
||||||
print("✅ Multithreaded MD5 finished")
|
print("✅ Multithreaded MD5 finished")
|
||||||
print("============================")
|
print("============================")
|
||||||
@@ -282,6 +235,7 @@ def run_md5_calculator(device_name=None,
|
|||||||
print(f"Threads: {threads}\n")
|
print(f"Threads: {threads}\n")
|
||||||
|
|
||||||
|
|
||||||
|
v
|
||||||
# ======================================================
|
# ======================================================
|
||||||
# RUN EXAMPLE
|
# RUN EXAMPLE
|
||||||
# ======================================================
|
# ======================================================
|
||||||
@@ -291,5 +245,5 @@ if __name__ == "__main__":
|
|||||||
extension="ANY",
|
extension="ANY",
|
||||||
max_size="ANY",
|
max_size="ANY",
|
||||||
path_prefix="ANY",
|
path_prefix="ANY",
|
||||||
threads=6
|
threads=12 # ← ADJUST THREAD COUNT HERE
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user