diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 8441fd8..47c1964 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,7 +1,10 @@ { "permissions": { "allow": [ - "Bash(.venv\\\\Scripts\\\\python.exe 10_collect_metadata.py --source demo_fotky --limit 3 --dry-run)" + "Bash(.venv\\\\Scripts\\\\python.exe 10_collect_metadata.py --source demo_fotky --limit 3 --dry-run)", + "Bash(PGPASSWORD=\"\" psql -h 192.168.1.76 -p 5432 -U vladimir.buzalka -d fotky_buzalkovi -c \"\\\\dt\")", + "PowerShell($env:PGPASSWORD = \"\"; psql -h 192.168.1.76 -p 5432 -U vladimir.buzalka -d fotky_buzalkovi -c \"\\\\dt\" 2>&1)", + "PowerShell(python -c \"import psycopg2; conn = psycopg2.connect\\(host='192.168.1.76', port=5432, user='vladimir.buzalka', password='', dbname='fotky_buzalkovi'\\); cur = conn.cursor\\(\\); cur.execute\\(\\\\\"SELECT table_name FROM information_schema.tables WHERE table_schema='public' ORDER BY table_name\\\\\"\\); rows = cur.fetchall\\(\\); print\\([r[0] for r in rows]\\); conn.close\\(\\)\" 2>&1)" ] } } diff --git a/30 SběrDat/collect_and_import.py b/30 SběrDat/collect_and_import.py new file mode 100644 index 0000000..773e1f3 --- /dev/null +++ b/30 SběrDat/collect_and_import.py @@ -0,0 +1,756 @@ +#!/usr/bin/env python3 +""" +collect_and_import.py — Sběr metadat ze zálohy a přímý import do PostgreSQL + +Čte záznamy z tabulky zaloha_obrazku, pro každý soubor extrahuje metadata +(EXIF, GPS, IPTC, XMP, hashe, rozměry) a ukládá přímo do tabulky photos. +Nevytváří žádný mezilehlý soubor. + +Resume: přeskočí záznamy, kde photos.zaloha_id už existuje. + +Použití: + python collect_and_import.py + python collect_and_import.py --workers 4 + python collect_and_import.py --limit 500 # jen prvních N (pro test) + python collect_and_import.py --dry-run # jen spočítá, nic nezpracuje + python collect_and_import.py --batch-size 200 +""" + +import argparse +import hashlib +import json +import logging +import os +import re +import signal +import struct +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Optional + +if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8": + sys.stdout.reconfigure(encoding="utf-8") + sys.stderr.reconfigure(encoding="utf-8") + +import exifread +logging.getLogger("exifread").setLevel(logging.CRITICAL) + +import imagehash +import psycopg2 +import psycopg2.extras +from psycopg2.extras import execute_values +from PIL import Image, ImageOps, IptcImagePlugin + +try: + from dotenv import load_dotenv + load_dotenv() +except ImportError: + pass + +# --------------------------------------------------------------------------- +# Konfigurace +# --------------------------------------------------------------------------- + +DB_CONFIG = { + "host": os.getenv("DB_HOST", "192.168.1.76"), + "port": int(os.getenv("DB_PORT", "5432")), + "user": os.getenv("DB_USER", "vladimir.buzalka"), + "password": os.getenv("DB_PASSWORD", ""), + "dbname": os.getenv("DB_NAME", "fotky_buzalkovi"), +} + +# Překlad: Linux NFS cesta (jak je uložena v DB) → Windows UNC +LINUX_PREFIX = "/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku" +WINDOWS_UNC = "//Tower1/ZalohaVsechObrazku" + +SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"} + +DEFAULT_WORKERS = 2 +DEFAULT_BATCH_SIZE = 100 + +IPTC_TAG_NAMES = { + (2, 5): "ObjectName", (2, 25): "Keywords", + (2, 55): "DateCreated", (2, 60): "TimeCreated", + (2, 80): "Byline", (2, 90): "City", + (2, 95): "ProvinceState", (2, 101): "CountryName", + (2, 105): "Headline", (2, 110): "Credit", + (2, 116): "Copyright", (2, 120): "Caption", +} + +MIME_MAP = { + "JPEG": "image/jpeg", "JPG": "image/jpeg", + "PNG": "image/png", "WEBP": "image/webp", + "HEIF": "image/heif", "HEIC": "image/heif", + "TIFF": "image/tiff", "TIF": "image/tiff", + "BMP": "image/bmp", "GIF": "image/gif", +} + +# --------------------------------------------------------------------------- +# Překlad cesty: Linux NFS → Windows UNC +# --------------------------------------------------------------------------- + +def linux_to_windows(linux_path: str) -> Path: + """Převede /mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku/... na //Tower1/ZalohaVsechObrazku/...""" + if linux_path.startswith(LINUX_PREFIX): + rel = linux_path[len(LINUX_PREFIX):] # začíná / + return Path(WINDOWS_UNC + rel) + return Path(linux_path) + +# --------------------------------------------------------------------------- +# GPS +# --------------------------------------------------------------------------- + +def _rational_to_float(r) -> float: + if hasattr(r, "numerator") and hasattr(r, "denominator"): + return r.numerator / r.denominator if r.denominator != 0 else 0.0 + return float(r) + + +def _dms_to_decimal(vals) -> float: + d = _rational_to_float(vals[0]) + m = _rational_to_float(vals[1]) + s = _rational_to_float(vals[2]) + return d + m / 60.0 + s / 3600.0 + + +def extract_gps(raw_tags: dict) -> dict: + result = {} + try: + lat_tag = raw_tags.get("GPS GPSLatitude") + lat_ref = raw_tags.get("GPS GPSLatitudeRef") + lon_tag = raw_tags.get("GPS GPSLongitude") + lon_ref = raw_tags.get("GPS GPSLongitudeRef") + + if lat_tag and lon_tag: + lat = _dms_to_decimal(lat_tag.values) + lon = _dms_to_decimal(lon_tag.values) + if lat_ref and str(lat_ref).strip().upper().startswith("S"): + lat = -lat + if lon_ref and str(lon_ref).strip().upper().startswith("W"): + lon = -lon + result["gps_lat"] = round(lat, 7) + result["gps_lon"] = round(lon, 7) + + alt_tag = raw_tags.get("GPS GPSAltitude") + alt_ref = raw_tags.get("GPS GPSAltitudeRef") + if alt_tag and alt_tag.values: + alt = _rational_to_float(alt_tag.values[0]) + if alt_ref and alt_ref.values and alt_ref.values[0] == 1: + alt = -alt + result["gps_alt"] = round(alt, 2) + except Exception as e: + result["gps_error"] = str(e) + return result + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_serializable(obj): + if hasattr(obj, "numerator") and hasattr(obj, "denominator"): + try: + return float(obj) + except Exception: + return str(obj) + if isinstance(obj, dict): + return {str(k): _make_serializable(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [_make_serializable(x) for x in obj] + if isinstance(obj, bytes): + return obj[:200].decode("utf-8", errors="replace") + try: + json.dumps(obj) + return obj + except (TypeError, ValueError): + return str(obj) + + +def file_hash_sha256(path: Path, chunk: int = 65536) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + while data := f.read(chunk): + h.update(data) + return h.hexdigest() + + +def clean_nullbytes(obj): + if isinstance(obj, str): + return obj.replace("\x00", "") + if isinstance(obj, dict): + return {k: clean_nullbytes(v) for k, v in obj.items()} + if isinstance(obj, list): + return [clean_nullbytes(v) for v in obj] + return obj + + +def hex_to_int64(hex_str) -> Optional[int]: + if not hex_str: + return None + try: + unsigned = int(str(hex_str).strip(), 16) & 0xFFFFFFFFFFFFFFFF + return struct.unpack("q", struct.pack("Q", unsigned))[0] + except Exception: + return None + + +def parse_fraction(s) -> Optional[float]: + if s is None: + return None + try: + s = str(s).strip() + if "/" in s: + num, den = s.split("/", 1) + d = float(den) + return float(num) / d if d != 0 else None + return float(s) + except Exception: + return None + + +def parse_exif_datetime(dt_str, offset_str=None) -> Optional[datetime]: + if not dt_str: + return None + try: + s = str(dt_str).strip() + date_part = s[:10].replace(":", "-") + time_part = s[11:19] if len(s) >= 19 else "00:00:00" + dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S") + if offset_str: + m = re.match(r"([+-])(\d{2}):(\d{2})", str(offset_str).strip()) + if m: + sign = 1 if m.group(1) == "+" else -1 + tz = timezone(timedelta(hours=sign * int(m.group(2)), + minutes=sign * int(m.group(3)))) + return dt.replace(tzinfo=tz) + return dt.replace(tzinfo=timezone.utc) + except Exception: + return None + + +def parse_iso(raw) -> Optional[int]: + if raw is None: + return None + try: + return int(float(str(raw).strip().strip("[]").split(",")[0].strip())) + except Exception: + return None + +# --------------------------------------------------------------------------- +# IPTC + XMP +# --------------------------------------------------------------------------- + +def _parse_iptc(raw_iptc) -> dict: + if not raw_iptc: + return {} + out = {} + for key, value in raw_iptc.items(): + name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}") + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + elif isinstance(value, list): + value = [v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v for v in value] + out[name] = value + return out + + +XMP_PATTERNS = { + "creator_tool": r'xmp:CreatorTool="([^"]+)"', + "create_date": r'xmp:CreateDate="([^"]+)"', + "modify_date": r'xmp:ModifyDate="([^"]+)"', + "rating": r'xmp:Rating="([^"]+)"', + "title": r']*>.*?]*>([^<]+)', + "description": r']*>.*?]*>([^<]+)', + "creator": r']*>.*?]*>([^<]+)', + "subject_block": r']*>(.*?)', +} + + +def _parse_xmp(xmp_raw) -> dict: + if not xmp_raw: + return {} + if isinstance(xmp_raw, bytes): + xmp_raw = xmp_raw.decode("utf-8", errors="replace") + out = {} + for name, pat in XMP_PATTERNS.items(): + m = re.search(pat, xmp_raw, re.DOTALL) + if m: + out[name] = m.group(1).strip() + if "subject_block" in out: + kws = re.findall(r"]*>([^<]+)", out.pop("subject_block")) + if kws: + out["keywords"] = kws + face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw)) + if face_count: + out["face_regions_count"] = face_count + face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw) + if face_names: + out["face_names"] = face_names + out["_xmp_bytes"] = len(xmp_raw) + return out + +# --------------------------------------------------------------------------- +# Sběr metadat jedné fotky +# --------------------------------------------------------------------------- + +def collect_photo(path: Path) -> dict: + """Vrátí dict se všemi daty o jedné fotce. Nikdy nevyhodí výjimku.""" + record: dict = {} + stat = path.stat() + + record["file_path"] = str(path) + record["file_name"] = path.name + record["file_stem"] = path.stem + record["file_ext"] = path.suffix.lower() + record["file_size"] = stat.st_size + record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() + record["mtime_ts"] = stat.st_mtime + record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat() + + try: + record["sha256_file"] = file_hash_sha256(path) + except Exception as e: + record["sha256_file"] = None + record["sha256_file_error"] = str(e) + + # EXIF + GPS + record["exif"] = {} + try: + with open(path, "rb") as f: + raw_tags = exifread.process_file(f, details=True) + record.update(extract_gps(raw_tags)) + for k, v in raw_tags.items(): + if "Thumbnail" in k and "JPEGInterchangeFormat" not in k: + continue + record["exif"][k] = str(v) + except Exception as e: + record["exif_error"] = str(e) + + # Pillow: rozměry, hashe, IPTC, XMP + try: + with Image.open(path) as img: + record["format"] = img.format + record["mode"] = img.mode + record["width"] = img.width + record["height"] = img.height + record["megapixels"] = round((img.width * img.height) / 1_000_000, 2) + record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info + dpi = img.info.get("dpi") + record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi + record["icc_profile"] = "icc_profile" in img.info + record["embedded_thumbnail"] = "thumbnail" in img.info + + try: + record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img)) + except Exception as e: + record["iptc"] = {"_error": str(e)} + + record["xmp"] = _parse_xmp(img.info.get("xmp")) + + try: + img_r = ImageOps.exif_transpose(img) + if img_r.mode != "RGB": + img_r = img_r.convert("RGB") + pixels = img_r.tobytes() + record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest() + ph = imagehash.phash(img_r) + dh = imagehash.dhash(img_r) + record["phash"] = str(ph) + record["dhash"] = str(dh) + record["phash_int"] = hex_to_int64(str(ph)) + record["dhash_int"] = hex_to_int64(str(dh)) + except Exception as e: + record["pixel_hash_error"] = str(e) + record["sha256_pixels"] = None + record["phash"] = record["dhash"] = None + record["phash_int"] = record["dhash_int"] = None + + except Exception as e: + record["pil_error"] = str(e) + + record["collected_at"] = datetime.now(tz=timezone.utc).isoformat() + return record + +# --------------------------------------------------------------------------- +# Mapování záznamu → DB sloupce +# --------------------------------------------------------------------------- + +COLUMNS = [ + "zaloha_id", + "sha256_file", "sha256_pixels", "phash", "dhash", + "file_path", "file_name", "file_stem", "file_ext", + "file_size", "mime_type", "format", "mode", "width", "height", "megapixels", + "has_transparency", "icc_profile", "embedded_thumbnail", + "taken_at", "taken_at_source", "mtime", "collected_at", + "camera_make", "camera_model", "lens_model", + "iso", "aperture", "exposure_time", "focal_length_mm", + "gps_lat", "gps_lon", "gps_altitude", + "is_screenshot", "face_count", + "exif_raw", "iptc_raw", "xmp_raw", +] + +INSERT_SQL = f""" +INSERT INTO photos ({", ".join(COLUMNS)}) +VALUES %s +ON CONFLICT (sha256_file) DO NOTHING +""" + + +def extract_fields(zaloha_id: int, rec: dict) -> Optional[tuple]: + """Vrátí tuple pro INSERT nebo None pokud sha256_file chybí (soubor nečitelný).""" + if not rec.get("sha256_file"): + return None + + exif = clean_nullbytes(rec.get("exif") or {}) + iptc = clean_nullbytes(rec.get("iptc") or {}) + xmp = clean_nullbytes(rec.get("xmp") or {}) + + # taken_at — z EXIF, fallback na mtime + dt_orig = exif.get("EXIF DateTimeOriginal") or exif.get("Image DateTime") + dt_offset = exif.get("EXIF OffsetTimeOriginal") or exif.get("EXIF OffsetTime") + taken_at = parse_exif_datetime(dt_orig, dt_offset) + taken_at_source = "exif" if taken_at else None + + mtime = None + if rec.get("mtime"): + try: + mtime = datetime.fromisoformat(rec["mtime"]) + except Exception: + pass + + if not taken_at and mtime: + taken_at = mtime + taken_at_source = "mtime" + + collected_at = None + if rec.get("collected_at"): + try: + collected_at = datetime.fromisoformat(rec["collected_at"]) + except Exception: + pass + + camera_make = (str(exif.get("Image Make", "") or "").strip()) or None + camera_model = (str(exif.get("Image Model", "") or "").strip()) or None + lens_model = (str(exif.get("EXIF LensModel", "") or "").strip()) or None + + iso = parse_iso(exif.get("EXIF ISOSpeedRatings")) + + _ap = parse_fraction(exif.get("EXIF FNumber")) + aperture = round(_ap, 2) if _ap is not None else None + + exposure_raw = exif.get("EXIF ExposureTime") + exposure_time = str(exposure_raw).strip() if exposure_raw else None + + _fl_raw = exif.get("EXIF FocalLength") + _fl = parse_fraction(str(_fl_raw).split()[0]) if _fl_raw else None + focal_length_mm = round(_fl, 2) if _fl is not None else None + + # GPS — použijeme předpočítané hodnoty z collect_photo + gps_lat = rec.get("gps_lat") + gps_lon = rec.get("gps_lon") + gps_altitude = rec.get("gps_alt") + + xmp_desc = str(xmp.get("description") or "").lower() + is_screenshot = "screenshot" in xmp_desc + + face_count = None + if "face_regions_count" in xmp: + try: + face_count = int(xmp["face_regions_count"]) + except Exception: + pass + + fmt = (rec.get("format") or "").strip() + mime_type = MIME_MAP.get(fmt.upper(), f"image/{fmt.lower()}" if fmt else None) + + fields = { + "zaloha_id": zaloha_id, + "sha256_file": rec["sha256_file"], + "sha256_pixels": rec.get("sha256_pixels"), + "phash": rec.get("phash_int"), + "dhash": rec.get("dhash_int"), + "file_path": rec.get("file_path", ""), + "file_name": rec.get("file_name", ""), + "file_stem": rec.get("file_stem"), + "file_ext": (rec.get("file_ext") or "").lower().strip() or None, + "file_size": int(rec["file_size"]) if rec.get("file_size") else None, + "mime_type": mime_type, + "format": fmt or None, + "mode": rec.get("mode"), + "width": int(rec["width"]) if rec.get("width") else None, + "height": int(rec["height"]) if rec.get("height") else None, + "megapixels": rec.get("megapixels"), + "has_transparency": bool(rec.get("has_transparency")), + "icc_profile": bool(rec.get("icc_profile")), + "embedded_thumbnail": bool(rec.get("embedded_thumbnail")), + "taken_at": taken_at, + "taken_at_source": taken_at_source, + "mtime": mtime, + "collected_at": collected_at, + "camera_make": camera_make, + "camera_model": camera_model, + "lens_model": lens_model, + "iso": iso, + "aperture": aperture, + "exposure_time": exposure_time, + "focal_length_mm": focal_length_mm, + "gps_lat": gps_lat, + "gps_lon": gps_lon, + "gps_altitude": gps_altitude, + "is_screenshot": is_screenshot, + "face_count": face_count, + "exif_raw": json.dumps(_make_serializable(exif), ensure_ascii=False) if exif else None, + "iptc_raw": json.dumps(iptc, ensure_ascii=False) if iptc else None, + "xmp_raw": json.dumps(xmp, ensure_ascii=False) if xmp else None, + } + return tuple(fields[c] for c in COLUMNS) + +# --------------------------------------------------------------------------- +# Progress +# --------------------------------------------------------------------------- + +class Progress: + def __init__(self, total: int): + self.total = total + self.done = 0 + self.errors = 0 + self.skipped = 0 + self.start = time.monotonic() + + def tick(self, ok: bool = True): + if ok: + self.done += 1 + else: + self.errors += 1 + + def skip(self): + self.skipped += 1 + + def report(self, current_file: str = "") -> str: + elapsed = time.monotonic() - self.start + processed = self.done + self.errors + self.skipped + rate = processed / elapsed if elapsed > 0 else 0 + eta_str = "" + if self.total and rate > 0: + remaining = (self.total - processed) / rate + h, r = divmod(int(remaining), 3600) + m, s = divmod(r, 60) + eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}" + pct = f" ({100 * processed / self.total:.1f}%)" if self.total else "" + name = Path(current_file).name[:40] if current_file else "" + return ( + f"\r {processed}/{self.total}{pct}" + f" ok={self.done} err={self.errors} skip={self.skipped}" + f" {rate:.1f} f/s{eta_str} {name:<40}" + ) + +# --------------------------------------------------------------------------- +# Shutdown +# --------------------------------------------------------------------------- + +_shutdown = False + +def _handle_sigint(sig, frame): + global _shutdown + print("\n\n[!] Preruseno — dokoncuji probihaici davku...") + _shutdown = True + +# --------------------------------------------------------------------------- +# DB — načtení nezpracovaných záznamů +# --------------------------------------------------------------------------- + +def count_unprocessed(conn) -> int: + with conn.cursor() as cur: + cur.execute(""" + SELECT COUNT(*) FROM zaloha_obrazku z + LEFT JOIN photos p ON p.zaloha_id = z.id + WHERE p.id IS NULL + """) + return cur.fetchone()[0] + + +def iter_unprocessed(conn, limit: int = 0): + """Generátor: streamuje nezpracované záznamy ze zaloha_obrazku.""" + sql = """ + SELECT z.id, z.cesta_zalohy, z.nazev_souboru + FROM zaloha_obrazku z + LEFT JOIN photos p ON p.zaloha_id = z.id + WHERE p.id IS NULL + ORDER BY z.id + """ + if limit: + sql += f" LIMIT {limit}" + # Named cursor pro serverové streamování (nenahrává vše do RAM) + with conn.cursor("unprocessed_cursor") as cur: + cur.itersize = 2000 + cur.execute(sql) + for row in cur: + yield row # (id, cesta_zalohy, nazev_souboru) + +# --------------------------------------------------------------------------- +# DB — flush dávky +# --------------------------------------------------------------------------- + +def flush_batch(conn, batch: list) -> int: + """Vloží dávku do DB, commitne. Vrátí počet skutečně vložených řádků.""" + if not batch: + return 0 + with conn.cursor() as cur: + execute_values(cur, INSERT_SQL, batch) + conn.commit() + return len(batch) + +# --------------------------------------------------------------------------- +# Zpracování jednoho záznamu +# --------------------------------------------------------------------------- + +def process_one(zaloha_id: int, linux_path: str) -> tuple[Optional[tuple], str]: + """ + Vrátí (db_row, error_msg). + db_row je None pokud soubor nejde číst nebo chybí sha256. + """ + ext = Path(linux_path).suffix.lower() + if ext not in SUPPORTED_EXTENSIONS: + return None, f"nepodporovana pripona: {ext}" + + win_path = linux_to_windows(linux_path) + + try: + rec = collect_photo(win_path) + except Exception as e: + return None, str(e) + + if not rec.get("sha256_file"): + err = rec.get("sha256_file_error") or rec.get("pil_error") or "sha256 chybi" + return None, err + + try: + row = extract_fields(zaloha_id, rec) + except Exception as e: + return None, f"extract_fields: {e}" + + return row, "" + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="collect_and_import.py — Sběr metadat ze zálohy a import do DB" + ) + parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS, + help=f"Pocet paralelnich vlaken (default: {DEFAULT_WORKERS})") + parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE, + help=f"Velikost davky pro INSERT (default: {DEFAULT_BATCH_SIZE})") + parser.add_argument("--limit", type=int, default=0, + help="Zpracovat maximalne N fotek (0 = vse)") + parser.add_argument("--dry-run", action="store_true", + help="Jen spocita nezpracovane zaznamy, nic neimportuje") + args = parser.parse_args() + + print("[collect_and_import]") + print(f" DB: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}") + print(f" Workers: {args.workers}") + print(f" Batch size: {args.batch_size}") + print(f" Limit: {args.limit or 'vse'}") + print() + + conn = psycopg2.connect(**DB_CONFIG) + conn.autocommit = False + + print(" Pocitam nezpracovane zaznamy...", end=" ", flush=True) + total = count_unprocessed(conn) + effective_total = min(total, args.limit) if args.limit else total + print(f"{total} (zpracovano bude: {effective_total})") + + if args.dry_run: + conn.close() + return + + if total == 0: + print(" Vse jiz zpracovano.") + conn.close() + return + + signal.signal(signal.SIGINT, _handle_sigint) + progress = Progress(total=effective_total) + + error_log_path = Path(__file__).parent / "output" / "collect_errors.log" + error_log_path.parent.mkdir(parents=True, exist_ok=True) + err_f = open(error_log_path, "a", encoding="utf-8", buffering=1) + err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n") + + batch: list[tuple] = [] + + def handle_result(zaloha_id: int, linux_path: str, row: Optional[tuple], err: str): + nonlocal batch + if row is None: + if err: + progress.tick(ok=False) + err_f.write(f"{zaloha_id}\t{linux_path}\t{err}\n") + else: + progress.skip() + else: + progress.tick(ok=True) + batch.append(row) + if len(batch) >= args.batch_size: + flush_batch(conn, batch) + batch = [] + print(progress.report(linux_path), end="", flush=True) + + print(f"\n Zpracovavam...\n") + + # Samostatné DB spojení pro čtení (generátor) a zápis (conn) nemohou sdílet transakci + conn_read = psycopg2.connect(**DB_CONFIG) + conn_read.autocommit = False # named cursor potřebuje transakci + + if args.workers > 1: + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = {} + for zid, cesta, _ in iter_unprocessed(conn_read, args.limit): + if _shutdown: + pool.shutdown(wait=False, cancel_futures=True) + break + f = pool.submit(process_one, zid, cesta) + futures[f] = (zid, cesta) + + for future in as_completed(futures): + if _shutdown: + break + zid, cesta = futures[future] + try: + row, err = future.result() + except Exception as e: + row, err = None, str(e) + handle_result(zid, cesta, row, err) + else: + for zid, cesta, _ in iter_unprocessed(conn_read, args.limit): + if _shutdown: + break + row, err = process_one(zid, cesta) + handle_result(zid, cesta, row, err) + + # Flush zbytku + if batch: + flush_batch(conn, batch) + + conn_read.close() + conn.close() + err_f.close() + + elapsed = time.monotonic() - progress.start + print(f"\n\n{'='*60}") + print(f" Dokonceno za {elapsed:.1f}s") + print(f" Ulozeno: {progress.done}") + print(f" Preskoceno: {progress.skipped} (nepodporovana pripona)") + print(f" Chyby: {progress.errors}") + if progress.errors: + print(f" Chybovy log: {error_log_path}") + print(f"{'='*60}") + + +if __name__ == "__main__": + main() diff --git a/CONTEXT.md b/CONTEXT.md index b965b0e..5e144aa 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -39,11 +39,11 @@ Systém pro zálohu, organizaci a tagování ~200 000 rodinných fotek. Lokáln ## DB tabulky v fotky_buzalkovi -### Starší (z předchozí fáze) -- `photos` — sha256_file UNIQUE, phash, EXIF JSONB, indexy +### Skupina 1: Zpracovaná metadata fotek +- `photos` — naparsované EXIF/IPTC/XMP, hashe, rozměry, GPS. Sloupec `zaloha_id` → FK na `zaloha_obrazku`. - `tags`, `photo_tags` — tagování -### Zálohovací pipeline (vytvořeno 2026-05-24) +### Skupina 2: Zálohovací pipeline (vytvořeno 2026-05-24) ```sql zaloha_obrazku id SERIAL PK, blake3_hash VARCHAR(64) UNIQUE, cesta_zalohy TEXT, @@ -56,6 +56,10 @@ zdrojove_soubory UNIQUE(hostname, cesta_zdroje) ``` +### Propojení (vytvořeno 2026-05-26) +`photos.zaloha_id` → `zaloha_obrazku.id` — každá zpracovaná fotka v `photos` je svázána +se zdrojovou zálohou. Resume importu funguje přes LEFT JOIN na tomto sloupci. + --- ## Zálohovací skripty @@ -116,6 +120,39 @@ zdrojove_soubory - Tower1 spuštěn, přidal 104 souborů - Windows skript připraven, zatím nespuštěn na žádném PC +## Stav k 2026-05-26 + +- `zaloha_obrazku` má 1 502 539 záznamů, `zdrojove_soubory` 2 192 650 +- Tabulka `photos` vyprázdněna (původně 85 833 testovacích záznamů z filesystémového skenu) +- Přidán sloupec `photos.zaloha_id` (FK → `zaloha_obrazku.id`) pro propojení tabulek +- Nový skript `30 SběrDat/collect_and_import.py` čte z `zaloha_obrazku`, zpracuje metadata + a ukládá přímo do `photos` (bez mezilehlého JSONL). Resume přes `zaloha_id`. + Překlad cesty: `/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku/...` → `//Tower1/ZalohaVsechObrazku/...` +- Starší skripty `10_collect_metadata.py` + `import_to_db.py` zůstávají jako reference, + ale produkce běží přes `collect_and_import.py`. + +--- + +## Zpracování metadat (Skupina 1) + +**Soubor:** `30 SběrDat/collect_and_import.py` + +**Spuštění (Windows, lokální .venv):** +``` +.\.venv\Scripts\python.exe "30 SběrDat\collect_and_import.py" --workers 4 +``` + +**Co dělá:** +1. Načte z `zaloha_obrazku` jen záznamy bez odpovídajícího řádku v `photos` (LEFT JOIN přes `zaloha_id`) +2. Pro každou cestu přeloží Linux NFS → Windows UNC +3. Spočítá: SHA-256 souboru i pixelů, pHash, dHash, EXIF (ExifRead), IPTC, XMP, GPS, rozměry +4. Vloží do `photos` po dávkách s `ON CONFLICT (sha256_file) DO NOTHING` (per-batch commit) +5. Resume = bezpečné, znovuspuštění pokračuje kde skončilo + +**Argumenty:** `--workers N`, `--batch-size N`, `--limit N`, `--dry-run` + +**Závislosti:** psycopg2, python-dotenv, exifread, imagehash, Pillow + --- ## Otevřené otázky diff --git a/SCHEMA.md b/SCHEMA.md index 557d1ae..ade4ae3 100644 --- a/SCHEMA.md +++ b/SCHEMA.md @@ -9,13 +9,14 @@ PostgreSQL 192.168.1.76:5432, databáze `fotky_buzalkovi`. Tyto tabulky obsahují naparsované informace o fotkách — EXIF, hashe, metadata, tagy. Jsou základem pro veškerou další práci (vyhledávání, deduplikace, organizace). -### photos (85 833 záznamů) +### photos (vyprázdněno 2026-05-26, naplňuje se z `zaloha_obrazku`) Hlavní tabulka. Každý řádek = jedna unikátní fotka identifikovaná hashem `sha256_file`. | Sloupec | Typ | Nullable | Default | Popis | |---------|-----|----------|---------|-------| | **id** | BIGSERIAL | NO | autoincrement | PK | +| zaloha_id | INTEGER | YES | — | FK → zaloha_obrazku(id) ON DELETE SET NULL (propojení s nasbíranou zálohou) | | **sha256_file** | CHAR(64) | NO | — | SHA-256 celého souboru (UNIQUE) | | sha256_pixels | CHAR(64) | YES | — | SHA-256 pixelových dat (odhalí změnu jen v metadatech) | | phash | BIGINT | YES | — | Perceptuální hash (vizuální podobnost) | @@ -67,6 +68,7 @@ Hlavní tabulka. Každý řádek = jedna unikátní fotka identifikovaná hashem - `idx_photos_file_name` — (file_name) - `idx_photos_file_ext` — (file_ext) - `idx_photos_exif_gin` — GIN (exif_raw) +- `idx_photos_zaloha_id` — (zaloha_id) — pro rychlý resume při importu --- @@ -168,4 +170,6 @@ BLAKE3 hash) může mít více záznamů, pokud existuje na různých místech/p - Tabulka `cameras` z původního `create_schema.py` v DB neexistuje — informace o kameře jsou přímo ve sloupcích `camera_make` / `camera_model` v tabulce `photos`. - EXIF parser: ExifRead (Pillow má bug v GPS). -- Tabulky skupiny 1 a skupiny 2 zatím nejsou propojené (žádný FK mezi `photos` a `zaloha_obrazku`). +- Tabulky skupiny 1 a skupiny 2 jsou propojené přes `photos.zaloha_id` → `zaloha_obrazku.id` (přidáno 2026-05-26). + Skript `30 SběrDat/collect_and_import.py` čte záznamy z `zaloha_obrazku`, zpracuje metadata + a uloží do `photos` včetně FK na zdrojovou zálohu.