""" collect_pictures.py ------------------- Prochází všechny shares pod /mnt/user/ (kromě ZalohaVsechObrazku), najde JPG/JPEG soubory, zkopíruje je (deduplikace BLAKE3) do /mnt/user/ZalohaVsechObrazku/ a zapíše záznamy do PostgreSQL. Tabulky: zaloha_obrazku – jedna fyzická záloha na unikátní BLAKE3 hash zdrojove_soubory – všechny nalezené zdrojové soubory (i duplikáty) Bezpečné pro opakované spuštění (pokračuje tam, kde skončilo). Bezpečné pro souběžný běh na více strojích (ON CONFLICT v SQL). """ import os import sys import time import shutil import socket import logging from pathlib import Path from datetime import datetime import blake3 import psycopg2 from psycopg2.extras import execute_values # ── Konfigurace ────────────────────────────────────────────────────────────── DB_CONFIG = { "host": "192.168.1.76", "port": 5432, "user": "vladimir.buzalka", "password": "Vlado7309208104++", "database": "fotky_buzalkovi", } SOURCE_BASE = Path("/mnt/user") JPEG_EXTENSIONS = {".jpg", ".jpeg"} ZALOHA_DIR_MAP = { "Tower1": Path("/mnt/user/ZalohaVsechObrazku"), "tower": Path("/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku"), } ZALOHA_DIR_DEFAULT = Path("/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku") EXCLUDED_DIR_NAMES_LOWER = {"zalohavsechobrazku", "zálohavsechobrázku", "zálohavsechobrazku"} BATCH_SIZE = 500 LOG_FILE = Path(__file__).parent / "collect_pictures.log" # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, encoding="utf-8"), ], ) log = logging.getLogger(__name__) # ── SQL ─────────────────────────────────────────────────────────────────────── SQL_CREATE_TABLES = """ CREATE TABLE IF NOT EXISTS zaloha_obrazku ( id SERIAL PRIMARY KEY, blake3_hash VARCHAR(64) UNIQUE NOT NULL, cesta_zalohy TEXT NOT NULL, nazev_souboru VARCHAR(512) NOT NULL, velikost BIGINT, datum_kopirovani TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS zdrojove_soubory ( id SERIAL PRIMARY KEY, hostname VARCHAR(255) NOT NULL, cesta_zdroje TEXT NOT NULL, nazev_souboru VARCHAR(512) NOT NULL, velikost BIGINT, datum_nalezeni TIMESTAMP DEFAULT NOW(), blake3_hash VARCHAR(64) NOT NULL, zaloha_id INTEGER REFERENCES zaloha_obrazku(id), UNIQUE (hostname, cesta_zdroje) ); CREATE INDEX IF NOT EXISTS idx_zaloha_hash ON zaloha_obrazku (blake3_hash); CREATE INDEX IF NOT EXISTS idx_zdroj_hash ON zdrojove_soubory (blake3_hash); CREATE INDEX IF NOT EXISTS idx_zdroj_zaloha ON zdrojove_soubory (zaloha_id); CREATE INDEX IF NOT EXISTS idx_zdroj_host ON zdrojove_soubory (hostname); """ SQL_INSERT_ZALOHA = """ INSERT INTO zaloha_obrazku (blake3_hash, cesta_zalohy, nazev_souboru, velikost) VALUES (%s, %s, %s, %s) ON CONFLICT (blake3_hash) DO NOTHING RETURNING id """ SQL_INSERT_ZDROJ = """ INSERT INTO zdrojove_soubory (hostname, cesta_zdroje, nazev_souboru, velikost, blake3_hash, zaloha_id) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (hostname, cesta_zdroje) DO NOTHING """ SQL_GET_ZALOHA_ID = "SELECT id FROM zaloha_obrazku WHERE blake3_hash = %s" # ── Pomocné funkce ──────────────────────────────────────────────────────────── def compute_blake3(path: Path, chunk: int = 1 << 20) -> str: h = blake3.blake3() with open(path, "rb") as f: while data := f.read(chunk): h.update(data) return h.hexdigest() def dest_path_for(source: Path, hostname: str) -> Path: try: relative = source.relative_to(SOURCE_BASE) except ValueError: relative = Path(source.name) return ZALOHA_DIR / hostname / relative def copy_to_backup(source: Path, dest: Path) -> None: dest.parent.mkdir(parents=True, exist_ok=True) if dest.exists(): return shutil.copy2(source, dest) def iter_jpeg_files(base: Path): for root, dirs, files in os.walk(base, followlinks=False): root_path = Path(root) if ZALOHA_DIR in (root_path, *root_path.parents) or root_path == ZALOHA_DIR: dirs.clear() continue dirs[:] = [ d for d in dirs if root_path / d != ZALOHA_DIR and d.lower() not in EXCLUDED_DIR_NAMES_LOWER ] for fname in files: if Path(fname).suffix.lower() in JPEG_EXTENSIONS: yield root_path / fname def load_known_sources(conn, hostname: str) -> set[str]: with conn.cursor("src_cursor") as cur: cur.itersize = 10000 cur.execute("SELECT cesta_zdroje FROM zdrojove_soubory WHERE hostname = %s", (hostname,)) return {row[0] for row in cur} def load_known_hashes(conn) -> dict[str, int]: with conn.cursor("hash_cursor") as cur: cur.itersize = 10000 cur.execute("SELECT blake3_hash, id FROM zaloha_obrazku") return {row[0]: row[1] for row in cur} # ── Hlavní logika ───────────────────────────────────────────────────────────── def process(conn, hostname): log.info(f"Hostname zdroje: {hostname}") log.info("Načítám známé zdroje z DB...") known_sources = load_known_sources(conn, hostname) log.info(f"Známých zdrojů v DB: {len(known_sources)}") log.info("Načítám známé hashe z DB...") known_hashes = load_known_hashes(conn) log.info(f"Známých hashů v DB: {len(known_hashes)}") stats = {"nalezeno": 0, "kopirovano": 0, "duplicit": 0, "chyb": 0, "preskoceno": 0} pending_zdroje = [] def flush_zdroje(): if not pending_zdroje: return cur = conn.cursor() execute_values( cur, """INSERT INTO zdrojove_soubory (hostname, cesta_zdroje, nazev_souboru, velikost, blake3_hash, zaloha_id) VALUES %s ON CONFLICT (hostname, cesta_zdroje) DO NOTHING""", pending_zdroje, ) conn.commit() cur.close() pending_zdroje.clear() for source in iter_jpeg_files(SOURCE_BASE): stats["nalezeno"] += 1 src_str = str(source) if src_str in known_sources: stats["preskoceno"] += 1 if stats["preskoceno"] % 5000 == 0: log.info(f"Přeskočeno (již v DB): {stats['preskoceno']}") continue t_start = time.perf_counter() try: velikost = source.stat().st_size hash_val = compute_blake3(source) except (OSError, PermissionError) as e: log.warning(f"CHYBA čtení: {source} → {e}") stats["chyb"] += 1 continue t_hash = time.perf_counter() zaloha_id = known_hashes.get(hash_val) if zaloha_id is not None: stats["duplicit"] += 1 vel_mb = velikost / (1024 * 1024) log.info( f"DUPLIKÁT {source.name} " f"({vel_mb:.1f} MB, hash={t_hash - t_start:.2f}s)" ) else: dest = dest_path_for(source, hostname) try: copy_to_backup(source, dest) except (OSError, shutil.Error) as e: log.warning(f"CHYBA kopírování: {source} → {e}") stats["chyb"] += 1 continue t_copy = time.perf_counter() cur = conn.cursor() cur.execute(SQL_INSERT_ZALOHA, (hash_val, str(dest), source.name, velikost)) row = cur.fetchone() if row: zaloha_id = row[0] else: cur.execute(SQL_GET_ZALOHA_ID, (hash_val,)) zaloha_id = cur.fetchone()[0] cur.close() conn.commit() t_db = time.perf_counter() known_hashes[hash_val] = zaloha_id stats["kopirovano"] += 1 vel_mb = velikost / (1024 * 1024) log.info( f"ZKOPÍROVÁNO [{stats['kopirovano']:>6}] {source.name} " f"({vel_mb:.1f} MB, hash={t_hash - t_start:.2f}s " f"copy={t_copy - t_hash:.2f}s db={t_db - t_copy:.2f}s " f"celkem={t_db - t_start:.2f}s)" ) pending_zdroje.append((hostname, src_str, source.name, velikost, hash_val, zaloha_id)) known_sources.add(src_str) if len(pending_zdroje) >= BATCH_SIZE: flush_zdroje() if stats["nalezeno"] % 5000 == 0: log.info( f"Průběh: nalezeno={stats['nalezeno']} " f"nových={stats['kopirovano']} duplikátů={stats['duplicit']} " f"chyb={stats['chyb']} přeskočeno={stats['preskoceno']}" ) flush_zdroje() return stats def main(): global ZALOHA_DIR hostname = socket.gethostname() ZALOHA_DIR = ZALOHA_DIR_MAP.get(hostname, ZALOHA_DIR_DEFAULT) log.info("=" * 60) log.info(f"Spuštění: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") log.info(f"Hostname: {hostname}") log.info(f"Zdroj: {SOURCE_BASE}") log.info(f"Záloha: {ZALOHA_DIR}/{hostname}/") try: conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = False log.info("PostgreSQL: připojeno") except Exception as e: log.error(f"Nelze se připojit k DB: {e}") sys.exit(1) with conn.cursor() as cur: cur.execute(SQL_CREATE_TABLES) conn.commit() log.info("Tabulky: OK") stats = {"nalezeno": 0, "kopirovano": 0, "duplicit": 0, "chyb": 0, "preskoceno": 0} try: stats = process(conn, hostname) except KeyboardInterrupt: log.warning("Přerušeno uživatelem (Ctrl+C) — dosavadní záznamy jsou uloženy.") conn.rollback() except Exception as e: log.error(f"Neočekávaná chyba: {e}", exc_info=True) conn.rollback() finally: conn.close() log.info("-" * 60) log.info(f"Nalezeno JPG/JPEG: {stats['nalezeno']}") log.info(f"Zkopírováno nových: {stats['kopirovano']}") log.info(f"Duplikátů (hash): {stats['duplicit']}") log.info(f"Přeskočeno (v DB): {stats['preskoceno']}") log.info(f"Chyb: {stats['chyb']}") log.info("Hotovo.") if __name__ == "__main__": main()