Files
walkfiles/22 WalkandSave.py
2025-12-10 14:05:31 +01:00

457 lines
14 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
from datetime import datetime
import mysql.connector
from mysql.connector import Error
from dotenv import load_dotenv
from pathlib import Path
# ======================================================
# Load .env from the script directory
# ======================================================
env_path = Path(__file__).resolve().parent / ".env"
load_dotenv(env_path)
# ======================================================
# Helper: MD5 of full file path string
# ======================================================
def md5_path(path: str) -> str:
return hashlib.md5(path.encode("utf8")).hexdigest()
# ======================================================
# MySQL CONNECTIONS
# ======================================================
def get_server_connection():
return mysql.connector.connect(
host=os.getenv("DB_MYSQL_HOST"),
user=os.getenv("DB_MYSQL_ROOT"),
password=os.getenv("DB_MYSQL_ROOT_PASS"),
port=int(os.getenv("DB_MYSQL_PORT")),
auth_plugin="mysql_native_password",
)
def get_db_connection():
conn = mysql.connector.connect(
host=os.getenv("DB_MYSQL_HOST"),
user=os.getenv("DB_MYSQL_ROOT"),
password=os.getenv("DB_MYSQL_ROOT_PASS"),
port=int(os.getenv("DB_MYSQL_PORT")),
database="walkfiles",
auth_plugin="mysql_native_password",
)
c = conn.cursor()
c.execute("SET NAMES utf8mb4 COLLATE utf8mb4_general_ci")
c.close()
return conn
# ======================================================
# DATABASE INITIALIZATION
# ======================================================
def init_db():
# Ensure DB exists
server = get_server_connection()
cur = server.cursor()
cur.execute("""
CREATE DATABASE IF NOT EXISTS walkfiles
DEFAULT CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci
""")
server.commit()
cur.close()
server.close()
# Connect
conn = get_db_connection()
cursor = conn.cursor()
# DEVICES
cursor.execute("""
CREATE TABLE IF NOT EXISTS devices (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) UNIQUE,
scanned_at DATETIME NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
# FOLDERS
cursor.execute("""
CREATE TABLE IF NOT EXISTS folders (
id INT AUTO_INCREMENT PRIMARY KEY,
path VARCHAR(2048) NOT NULL,
parent_id INT NULL,
device_id INT NOT NULL,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL,
deleted TINYINT(1) NOT NULL DEFAULT 0,
CONSTRAINT fk_folder_device
FOREIGN KEY (device_id) REFERENCES devices(id)
ON DELETE CASCADE,
UNIQUE KEY uniq_folder_path (device_id, path(255)),
INDEX idx_folder_dev (device_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
# FILES
cursor.execute("""
CREATE TABLE IF NOT EXISTS files (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
path VARCHAR(2048) NOT NULL,
path_md5 CHAR(32) NOT NULL,
size BIGINT NULL,
modified DATETIME NULL,
type VARCHAR(255) NULL,
folder_id INT NULL,
device_id INT NOT NULL,
deleted TINYINT(1) NOT NULL DEFAULT 0,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL,
CONSTRAINT fk_file_folder
FOREIGN KEY (folder_id) REFERENCES folders(id)
ON DELETE SET NULL,
CONSTRAINT fk_file_device
FOREIGN KEY (device_id) REFERENCES devices(id)
ON DELETE CASCADE,
UNIQUE KEY uniq_file_path_md5 (device_id, path_md5),
INDEX idx_file_folder (folder_id),
INDEX idx_file_deleted (device_id, deleted)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
conn.commit()
return conn, cursor
# ======================================================
# HELPERS — DEVICES & FOLDERS
# ======================================================
def get_or_create_device(cursor, conn, name: str) -> int:
now = datetime.now()
cursor.execute("INSERT IGNORE INTO devices (name, scanned_at) VALUES (%s,%s)", (name, now))
conn.commit()
cursor.execute("SELECT id FROM devices WHERE name=%s", (name,))
return cursor.fetchone()[0]
def load_folder_state(cursor, device_id: int):
cursor.execute("""
SELECT id, path, deleted
FROM folders
WHERE device_id=%s
""", (device_id,))
out = {}
for folder_id, path, deleted in cursor.fetchall():
out[path] = {"id": folder_id, "deleted": int(deleted)}
return out
import unicodedata
import unicodedata
from datetime import datetime
def get_or_create_folder(cursor, conn, folder_state, device_id, folder_path, parent_id):
# Normalize Unicode to avoid Černý vs Černý issue
folder_path = unicodedata.normalize("NFC", folder_path)
# Cache key is folder_path
key = folder_path
# 1) If we already know this folder → return cached ID
if key in folder_state:
return folder_state[key]["id"]
now = datetime.now()
# 2) Try to SELECT existing record
cursor.execute("""
SELECT id
FROM folders
WHERE device_id = %s AND path = %s
LIMIT 1
""", (device_id, folder_path))
row = cursor.fetchone()
if row:
folder_id = row[0]
folder_state[key] = {"id": folder_id, "deleted": 0}
return folder_id
# 3) INSERT new folder (idempotent)
cursor.execute("""
INSERT INTO folders (path, parent_id, device_id, first_seen, last_seen)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
id = LAST_INSERT_ID(id),
last_seen = VALUES(last_seen)
""", (folder_path, parent_id, device_id, now, now))
conn.commit()
folder_id = cursor.lastrowid
# 4) Save to memory cache
folder_state[key] = {"id": folder_id, "deleted": 0}
return folder_id
# ======================================================
# LOAD LAST FILE STATE
# ======================================================
def load_last_file_state(cursor, device_id: int):
cursor.execute("""
SELECT f.id, f.path_md5, f.deleted, f.size, f.modified
FROM files f
JOIN (
SELECT MAX(id) AS mx
FROM files
WHERE device_id=%s
GROUP BY path_md5
) t ON f.id = t.mx
""", (device_id,))
out = {}
for fid, md5, deleted, size, modified in cursor.fetchall():
out[md5] = {
"id": fid,
"deleted": int(deleted),
"size": size,
"modified": modified,
}
return out
# ======================================================
# MAIN SCANNER WITH BATCHING
# ======================================================
def walk_and_store_bulk():
BATCH_SIZE = 10000
# target_dir = r"\\tower1\#colddata"
# target_dir = r"z:"
# target_dir = r"\\tower\ebooks"
target_dir = r"\\tower\dedup"
# device_name = "TW22"
device_name = "TOWER"
if not os.path.isdir(target_dir):
print("Invalid directory:", target_dir)
return
conn, cursor = init_db()
now = datetime.now()
device_id = get_or_create_device(cursor, conn, device_name)
folder_state = load_folder_state(cursor, device_id)
file_state = load_last_file_state(cursor, device_id)
seen_folders = set()
seen_files = set()
files_to_insert = []
files_to_update = []
files_to_mark_deleted = []
folders_to_mark_deleted = []
total_files = 0
print(f"🔍 Scanning: {target_dir} (device {device_id})")
# -------------------------------------------------
# WALK FILESYSTEM
# -------------------------------------------------
for root, dirs, files in os.walk(target_dir):
folder_path = os.path.normpath(root)
# 1⃣ determine parent_id correctly
if root == target_dir:
parent_id = None
else:
parent_folder_path = os.path.normpath(os.path.dirname(root))
parent_id = get_or_create_folder(cursor, conn, folder_state,
device_id, parent_folder_path,
None)
# 2⃣ now insert current folder with correct parent_id
seen_folders.add(folder_path)
folder_id = get_or_create_folder(cursor, conn, folder_state,
device_id, folder_path,
parent_id)
# -------------------------------------------------
# FILE LOOP
# -------------------------------------------------
for name in files:
total_files += 1
filepath = os.path.normpath(os.path.join(root, name))
md5 = md5_path(filepath)
seen_files.add(md5)
try:
st = os.stat(filepath)
except FileNotFoundError:
continue
modified = datetime.fromtimestamp(st.st_mtime).replace(microsecond=0)
size = st.st_size
ext = os.path.splitext(name)[1][:250]
prev = file_state.get(md5)
if prev is None:
files_to_insert.append(
(name, filepath, md5, size, modified, ext,
folder_id, device_id, 0, now, now)
)
else:
if prev["deleted"] == 1:
files_to_insert.append(
(name, filepath, md5, size, modified, ext,
folder_id, device_id, 0, now, now)
)
else:
if prev["size"] != size or prev["modified"] != modified:
files_to_update.append(
(size, modified, now, prev["id"])
)
# -------------------------------------------------
# BATCH FLUSHING
# -------------------------------------------------
if len(files_to_insert) >= BATCH_SIZE:
print(f"💾 Flushing {len(files_to_insert)} inserts...")
cursor.executemany("""
INSERT INTO files (
name, path, path_md5, size, modified, type,
folder_id, device_id, deleted,
first_seen, last_seen
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", files_to_insert)
conn.commit()
files_to_insert.clear()
if len(files_to_update) >= BATCH_SIZE:
print(f"💾 Flushing {len(files_to_update)} updates...")
cursor.executemany("""
UPDATE files
SET size=%s, modified=%s, last_seen=%s, deleted=0
WHERE id=%s
""", files_to_update)
conn.commit()
files_to_update.clear()
# PROGRESS
if total_files % 1000 == 0:
print(f" ... processed {total_files} files")
# -------------------------------------------------
# MARK DELETED FILES
# -------------------------------------------------
for md5, info in file_state.items():
if info["deleted"] == 0 and md5 not in seen_files:
files_to_mark_deleted.append((now, info["id"]))
# -------------------------------------------------
# MARK DELETED FOLDERS
# -------------------------------------------------
for path, info in folder_state.items():
if info["deleted"] == 0 and path not in seen_folders:
folders_to_mark_deleted.append((now, info["id"]))
# -------------------------------------------------
# FINAL FLUSH (REMAINING BATCHES)
# -------------------------------------------------
if files_to_insert:
print(f"💾 Final flush: {len(files_to_insert)} inserts")
cursor.executemany("""
INSERT INTO files (
name, path, path_md5, size, modified, type,
folder_id, device_id, deleted,
first_seen, last_seen
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", files_to_insert)
conn.commit()
if files_to_update:
print(f"💾 Final flush: {len(files_to_update)} updates")
cursor.executemany("""
UPDATE files
SET size=%s, modified=%s, last_seen=%s, deleted=0
WHERE id=%s
""", files_to_update)
conn.commit()
if files_to_mark_deleted:
print(f"💾 Final flush: {len(files_to_mark_deleted)} deletions")
cursor.executemany("""
UPDATE files
SET deleted=1, last_seen=%s
WHERE id=%s
""", files_to_mark_deleted)
conn.commit()
if folders_to_mark_deleted:
cursor.executemany("""
UPDATE folders
SET deleted=1, last_seen=%s
WHERE id=%s
""", folders_to_mark_deleted)
conn.commit()
# -------------------------------------------------
# Update device timestamp
# -------------------------------------------------
cursor.execute("UPDATE devices SET scanned_at=%s WHERE id=%s", (now, device_id))
conn.commit()
cursor.close()
conn.close()
print("")
print("✅ Scan completed.")
print(" Total files:", total_files)
print(" Inserted:", len(files_to_insert))
print(" Updated:", len(files_to_update))
print(" Files deleted:", len(files_to_mark_deleted))
print(" Folders deleted:", len(folders_to_mark_deleted))
# ======================================================
# MAIN ENTRY
# ======================================================
if __name__ == '__main__':
walk_and_store_bulk()