""" collect_pictures_windows.py --------------------------- Windows verze zálohovacího skriptu. Prochází všechny lokální disky (ne síťové, ne CD), hledá JPG/JPEG, kopíruje (deduplikace BLAKE3) na Tower1 přes UNC cestu a zapisuje do PostgreSQL. Cesta zálohy: \\Tower1\ZalohaVsechObrazku\{hostname}\{písmeno_disku}\{cesta} Příklad: \\Tower1\ZalohaVsechObrazku\JMENO-PC\D\Foto\2023\img.jpg Bezpečné pro opakované spuštění (pokračuje tam kde skončilo). """ import os import sys import shutil import socket import string import ctypes import logging from pathlib import Path from datetime import datetime import blake3 import psycopg2 # ── Konfigurace ─────────────────────────────────────────────────────────────── DB_CONFIG = { "host": "192.168.1.76", "port": 5432, "user": "vladimir.buzalka", "password": "Vlado7309208104++", "database": "fotky_buzalkovi", } # Záloha vždy na Tower1 přes UNC ZALOHA_DIR = Path(r"\\Tower1\ZalohaVsechObrazku") JPEG_EXTENSIONS = {".jpg", ".jpeg"} # Adresáře přeskočené podle JMÉNA (kdekoliv ve stromě, case-insensitive) EXCLUDED_DIR_NAMES = { "ZalohaVsechObrazku", "ZalohaVšechObrázků", "$Recycle.Bin", "$RECYCLE.BIN", "System Volume Information", "Recovery", } # Celé cesty které se přeskočí (včetně podadresářů) EXCLUDED_FULL_PATHS = { Path(os.environ.get("WINDIR", r"C:\Windows")), # C:\Windows } LOG_FILE = Path(__file__).parent / "collect_pictures_windows.log" # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, encoding="utf-8"), ], ) log = logging.getLogger(__name__) # ── SQL (stejné jako Linux verze) ───────────────────────────────────────────── SQL_HASH_EXISTS = "SELECT id FROM zaloha_obrazku WHERE blake3_hash = %s" SQL_SOURCE_EXISTS = "SELECT id FROM zdrojove_soubory WHERE hostname = %s AND cesta_zdroje = %s" SQL_INSERT_ZALOHA = """ INSERT INTO zaloha_obrazku (blake3_hash, cesta_zalohy, nazev_souboru, velikost) VALUES (%s, %s, %s, %s) RETURNING id """ SQL_INSERT_ZDROJ = """ INSERT INTO zdrojove_soubory (hostname, cesta_zdroje, nazev_souboru, velikost, blake3_hash, zaloha_id) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (hostname, cesta_zdroje) DO NOTHING """ # ── Pomocné funkce ──────────────────────────────────────────────────────────── def get_local_drives() -> list[Path]: """Vrátí seznam lokálních pevných disků (typ FIXED), přeskočí síťové a CD.""" DRIVE_FIXED = 3 drives = [] bitmask = ctypes.windll.kernel32.GetLogicalDrives() for letter in string.ascii_uppercase: if bitmask & 1: drive = Path(f"{letter}:\\") drive_type = ctypes.windll.kernel32.GetDriveTypeW(str(drive)) if drive_type == DRIVE_FIXED: drives.append(drive) bitmask >>= 1 return drives def is_excluded_dir(name: str) -> bool: """Vrátí True pokud jméno adresáře patří do vyloučených (case-insensitive).""" return name.lower() in {n.lower() for n in EXCLUDED_DIR_NAMES} def is_excluded_path(path: Path) -> bool: """Vrátí True pokud cesta začíná některou z EXCLUDED_FULL_PATHS.""" for excl in EXCLUDED_FULL_PATHS: try: path.relative_to(excl) return True except ValueError: pass return False def compute_blake3(path: Path, chunk: int = 1 << 20) -> str: h = blake3.blake3() with open(path, "rb") as f: while data := f.read(chunk): h.update(data) return h.hexdigest() def dest_path_for(source: Path, hostname: str) -> Path: """ Zkonstruuje cílovou cestu v záloze. D:\Foto\2023\img.jpg → \\Tower1\ZalohaVsechObrazku\JMENO-PC\D\Foto\2023\img.jpg """ drive_letter = source.drive.rstrip(":") # "D:" → "D" relative = source.relative_to(source.drive + "\\") # "Foto\2023\img.jpg" return ZALOHA_DIR / hostname / drive_letter / relative def copy_to_backup(source: Path, dest: Path) -> None: dest.parent.mkdir(parents=True, exist_ok=True) if dest.exists(): return shutil.copy2(source, dest) def iter_jpeg_files(drives: list[Path]): """Generátor: prochází všechny lokální disky, vrací JPG/JPEG soubory.""" for drive in drives: log.info(f"Skenuji disk: {drive}") for root, dirs, files in os.walk(drive, followlinks=False): root_path = Path(root) # Přeskočit celé vyloučené cesty (C:\Windows apod.) if is_excluded_path(root_path): dirs.clear() continue # Odfiltrovat vyloučené adresáře podle jména dirs[:] = [ d for d in dirs if not is_excluded_dir(d) and not is_excluded_path(root_path / d) ] for fname in files: if Path(fname).suffix.lower() in JPEG_EXTENSIONS: yield root_path / fname # ── Hlavní logika ───────────────────────────────────────────────────────────── def process(conn, hostname, drives): cur = conn.cursor() stats = {"nalezeno": 0, "kopirovano": 0, "duplicit": 0, "chyb": 0, "preskoceno": 0} for source in iter_jpeg_files(drives): stats["nalezeno"] += 1 src_str = str(source) cur.execute(SQL_SOURCE_EXISTS, (hostname, src_str)) if cur.fetchone(): stats["preskoceno"] += 1 if stats["preskoceno"] % 500 == 0: log.info(f"Přeskočeno (již v DB): {stats['preskoceno']}") continue try: velikost = source.stat().st_size hash_val = compute_blake3(source) except (OSError, PermissionError) as e: log.warning(f"CHYBA čtení: {source} → {e}") stats["chyb"] += 1 continue cur.execute(SQL_HASH_EXISTS, (hash_val,)) row = cur.fetchone() if row: zaloha_id = row[0] cur.execute(SQL_INSERT_ZDROJ, (hostname, src_str, source.name, velikost, hash_val, zaloha_id)) conn.commit() stats["duplicit"] += 1 log.debug(f"DUPLIKÁT {source.name} (zaloha_id={zaloha_id})") else: dest = dest_path_for(source, hostname) try: copy_to_backup(source, dest) except (OSError, shutil.Error) as e: log.warning(f"CHYBA kopírování: {source} → {e}") stats["chyb"] += 1 continue cur.execute(SQL_INSERT_ZALOHA, (hash_val, str(dest), source.name, velikost)) zaloha_id = cur.fetchone()[0] cur.execute(SQL_INSERT_ZDROJ, (hostname, src_str, source.name, velikost, hash_val, zaloha_id)) conn.commit() stats["kopirovano"] += 1 log.info(f"ZKOPÍROVÁNO [{stats['kopirovano']:>6}] {source}") if stats["nalezeno"] % 1000 == 0: log.info( f"Průběh: nalezeno={stats['nalezeno']} " f"nových={stats['kopirovano']} duplikátů={stats['duplicit']} " f"chyb={stats['chyb']} přeskočeno={stats['preskoceno']}" ) cur.close() return stats def main(): hostname = socket.gethostname() drives = get_local_drives() log.info("=" * 60) log.info(f"Spuštění: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") log.info(f"Hostname: {hostname}") log.info(f"Disky: {[str(d) for d in drives]}") log.info(f"Záloha: {ZALOHA_DIR}\\{hostname}\\") try: conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = False log.info("PostgreSQL: připojeno") except Exception as e: log.error(f"Nelze se připojit k DB: {e}") sys.exit(1) try: stats = process(conn, hostname, drives) except KeyboardInterrupt: log.warning("Přerušeno uživatelem — dosavadní záznamy jsou uloženy.") conn.rollback() stats = {} except Exception as e: log.error(f"Neočekávaná chyba: {e}", exc_info=True) conn.rollback() stats = {} finally: conn.close() if stats: log.info("-" * 60) log.info(f"Nalezeno JPG/JPEG: {stats['nalezeno']}") log.info(f"Zkopírováno nových: {stats['kopirovano']}") log.info(f"Duplikátů (hash): {stats['duplicit']}") log.info(f"Přeskočeno (v DB): {stats['preskoceno']}") log.info(f"Chyb: {stats['chyb']}") log.info("Hotovo.") if __name__ == "__main__": main()