diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..8441fd8 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": [ + "Bash(.venv\\\\Scripts\\\\python.exe 10_collect_metadata.py --source demo_fotky --limit 3 --dry-run)" + ] + } +} diff --git a/10_collect_metadata.py b/10_collect_metadata.py new file mode 100644 index 0000000..227e362 --- /dev/null +++ b/10_collect_metadata.py @@ -0,0 +1,562 @@ +#!/usr/bin/env python3 +""" +10_collect_metadata.py — Sběr metadat ze všech fotek + +Pro každý soubor obrázku extrahuje: + • Informace ze souborového systému (cesta, velikost, mtime, ctime) + • SHA256 hash souboru + pixel hash (EXIF-orientation-aware) + • Perceptuální hashe (pHash, dHash) pro detekci duplikátů + • Všechny EXIF tagy přes ExifRead (primární parser) + • GPS souřadnice přepočtené na decimal degrees + • IPTC metadata (keywords, popis, autor) + • XMP metadata (incl. Apple obličeje, screenshoty) + +Výstup: JSONL soubor — jeden řádek = jeden objekt = jedna fotka +Chyby: samostatný .log soubor + +Použití: + python 10_collect_metadata.py + python 10_collect_metadata.py --source //tower/photosnahrani + python 10_collect_metadata.py --resume # přeskočí již zpracované soubory + python 10_collect_metadata.py --limit 100 # jen prvních 100 (pro test) + python 10_collect_metadata.py --dry-run # jen spočítá soubory, nic nezpracuje + python 10_collect_metadata.py --workers 4 # paralelní zpracování +""" + +import argparse +import hashlib +import json +import os +import re +import signal +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone +from pathlib import Path + +if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8": + sys.stdout.reconfigure(encoding="utf-8") + sys.stderr.reconfigure(encoding="utf-8") + +import exifread +import imagehash +from PIL import Image, ImageOps, IptcImagePlugin + +# --------------------------------------------------------------------------- +# Konfigurace +# --------------------------------------------------------------------------- + +SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"} +SOURCE = Path("//tower/photosnahrani") +OUTPUT_DIR = Path(__file__).parent / "output" +OUTPUT_JSONL = OUTPUT_DIR / "10_metadata.jsonl" +ERROR_LOG = OUTPUT_DIR / "10_errors.log" + +IPTC_TAG_NAMES = { + (2, 5): "ObjectName", + (2, 10): "Urgency", + (2, 15): "Category", + (2, 20): "SupplementalCategories", + (2, 25): "Keywords", + (2, 40): "SpecialInstructions", + (2, 55): "DateCreated", + (2, 60): "TimeCreated", + (2, 80): "Byline", + (2, 85): "BylineTitle", + (2, 90): "City", + (2, 92): "SubLocation", + (2, 95): "ProvinceState", + (2, 100): "CountryCode", + (2, 101): "CountryName", + (2, 103): "OriginalTransmissionReference", + (2, 105): "Headline", + (2, 110): "Credit", + (2, 115): "Source", + (2, 116): "Copyright", + (2, 118): "Contact", + (2, 120): "Caption", + (2, 122): "WriterEditor", +} + + +# --------------------------------------------------------------------------- +# GPS +# --------------------------------------------------------------------------- + +def _rational_to_float(r) -> float: + if hasattr(r, "numerator") and hasattr(r, "denominator"): + return r.numerator / r.denominator if r.denominator != 0 else 0.0 + return float(r) + + +def _dms_to_decimal(vals) -> float: + d = _rational_to_float(vals[0]) + m = _rational_to_float(vals[1]) + s = _rational_to_float(vals[2]) + return d + m / 60.0 + s / 3600.0 + + +def extract_gps(raw_tags: dict) -> dict: + """Přepočítá GPS DMS z ExifRead raw tagů na decimal degrees.""" + result = {} + try: + lat_tag = raw_tags.get("GPS GPSLatitude") + lat_ref = raw_tags.get("GPS GPSLatitudeRef") + lon_tag = raw_tags.get("GPS GPSLongitude") + lon_ref = raw_tags.get("GPS GPSLongitudeRef") + + if lat_tag and lon_tag: + lat = _dms_to_decimal(lat_tag.values) + lon = _dms_to_decimal(lon_tag.values) + + if lat_ref and str(lat_ref).strip().upper().startswith("S"): + lat = -lat + if lon_ref and str(lon_ref).strip().upper().startswith("W"): + lon = -lon + + result["gps_lat"] = round(lat, 7) + result["gps_lon"] = round(lon, 7) + + alt_tag = raw_tags.get("GPS GPSAltitude") + alt_ref = raw_tags.get("GPS GPSAltitudeRef") + if alt_tag and alt_tag.values: + alt = _rational_to_float(alt_tag.values[0]) + # ref==1 znamená pod mořem + if alt_ref and alt_ref.values and alt_ref.values[0] == 1: + alt = -alt + result["gps_alt"] = round(alt, 2) + + except Exception as e: + result["gps_error"] = str(e) + + return result + + +# --------------------------------------------------------------------------- +# Hashe +# --------------------------------------------------------------------------- + +def file_hash_sha256(path: Path, chunk: int = 65536) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + while data := f.read(chunk): + h.update(data) + return h.hexdigest() + + +# --------------------------------------------------------------------------- +# IPTC +# --------------------------------------------------------------------------- + +def _parse_iptc(raw_iptc) -> dict: + if not raw_iptc: + return {} + out = {} + for key, value in raw_iptc.items(): + name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}") + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + elif isinstance(value, list): + value = [ + v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v + for v in value + ] + out[name] = value + return out + + +# --------------------------------------------------------------------------- +# XMP +# --------------------------------------------------------------------------- + +XMP_PATTERNS = { + "creator_tool": r'xmp:CreatorTool="([^"]+)"', + "create_date": r'xmp:CreateDate="([^"]+)"', + "modify_date": r'xmp:ModifyDate="([^"]+)"', + "rating": r'xmp:Rating="([^"]+)"', + "label": r'xmp:Label="([^"]+)"', + "title": r']*>.*?]*>([^<]+)', + "description": r']*>.*?]*>([^<]+)', + "creator": r']*>.*?]*>([^<]+)', + "subject_block": r']*>(.*?)', +} + + +def _parse_xmp(xmp_raw) -> dict: + if not xmp_raw: + return {} + if isinstance(xmp_raw, bytes): + xmp_raw = xmp_raw.decode("utf-8", errors="replace") + + out = {} + for name, pat in XMP_PATTERNS.items(): + m = re.search(pat, xmp_raw, re.DOTALL) + if m: + out[name] = m.group(1).strip() + + if "subject_block" in out: + kws = re.findall(r"]*>([^<]+)", out.pop("subject_block")) + if kws: + out["keywords"] = kws + + face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw)) + if face_count: + out["face_regions_count"] = face_count + + # Apple face names (pokud jsou pojmenované v Photos) + face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw) + if face_names: + out["face_names"] = face_names + + out["_xmp_bytes"] = len(xmp_raw) + return out + + +# --------------------------------------------------------------------------- +# Hlavní sběr dat pro jednu fotku +# --------------------------------------------------------------------------- + +def collect_photo(path: Path, base_path: Path) -> dict: + """Vrátí dict se všemi daty o jedné fotce. Výjimky zachytí, nikdy nevyhodí.""" + record: dict = {} + stat = path.stat() + + # Souborový systém + record["file_path"] = str(path) + record["file_path_relative"] = str(path.relative_to(base_path)) if path.is_relative_to(base_path) else None + record["file_name"] = path.name + record["file_stem"] = path.stem + record["file_ext"] = path.suffix.lower() + record["file_size"] = stat.st_size + record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() + record["mtime_ts"] = stat.st_mtime + record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat() + + # SHA256 souboru (streamovaný — nepotřebuje načíst celý do RAM) + try: + record["sha256_file"] = file_hash_sha256(path) + except Exception as e: + record["sha256_file"] = None + record["sha256_file_error"] = str(e) + + # ExifRead — primární parser: všechny tagy + GPS + record["exif"] = {} + try: + with open(path, "rb") as f: + raw_tags = exifread.process_file(f, details=True) + + record.update(extract_gps(raw_tags)) + + for k, v in raw_tags.items(): + if "Thumbnail" in k and "JPEGInterchangeFormat" not in k: + continue + record["exif"][k] = str(v) + except Exception as e: + record["exif_error"] = str(e) + + # Pillow — jeden open pro vše: rozměry, pixel hash, perceptual hash, IPTC, XMP + try: + with Image.open(path) as img: + record["format"] = img.format + record["mode"] = img.mode + record["width"] = img.width + record["height"] = img.height + record["megapixels"] = round((img.width * img.height) / 1_000_000, 2) + record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info + dpi = img.info.get("dpi") + record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi + record["icc_profile"] = "icc_profile" in img.info + record["embedded_thumbnail"] = "thumbnail" in img.info + + # IPTC + try: + record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img)) + except Exception as e: + record["iptc"] = {"_error": str(e)} + + # XMP + record["xmp"] = _parse_xmp(img.info.get("xmp")) + + # Pixel hash + perceptuální hashe (EXIF orientation aware) + try: + img_r = ImageOps.exif_transpose(img) + if img_r.mode != "RGB": + img_r = img_r.convert("RGB") + pixels = img_r.tobytes() + record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest() + + ph = imagehash.phash(img_r) + dh = imagehash.dhash(img_r) + record["phash"] = str(ph) + record["dhash"] = str(dh) + # Celočíselná forma pro DB (BIGINT, signed) + ph_int = int(str(ph), 16) + record["phash_int"] = ph_int if ph_int < 2**63 else ph_int - 2**64 + except Exception as e: + record["pixel_hash_error"] = str(e) + record["sha256_pixels"] = None + record["phash"] = None + record["dhash"] = None + record["phash_int"] = None + + except Exception as e: + record["pil_error"] = str(e) + + record["collected_at"] = datetime.now(tz=timezone.utc).isoformat() + return record + + +# --------------------------------------------------------------------------- +# Procházení adresáře +# --------------------------------------------------------------------------- + +def iter_photos(source: Path): + """Generátor: rekurzivně vrací cesty k obrázkům.""" + for root, dirs, files in os.walk(source): + # Skrytá adresáře ignorovat + dirs[:] = [d for d in dirs if not d.startswith(".")] + for fname in files: + if Path(fname).suffix.lower() in SUPPORTED_EXTENSIONS: + yield Path(root) / fname + + +def count_photos(source: Path) -> int: + return sum(1 for _ in iter_photos(source)) + + +def load_processed_paths(jsonl_path: Path) -> set: + """Načte sadu file_path ze stávajícího JSONL pro resume.""" + processed = set() + if not jsonl_path.exists(): + return processed + with open(jsonl_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + fp = obj.get("file_path") + if fp: + processed.add(fp) + except json.JSONDecodeError: + pass + return processed + + +# --------------------------------------------------------------------------- +# Progress +# --------------------------------------------------------------------------- + +class Progress: + def __init__(self, total: int | None): + self.total = total + self.done = 0 + self.errors = 0 + self.skipped = 0 + self.start = time.monotonic() + + def tick(self, ok: bool = True): + if ok: + self.done += 1 + else: + self.errors += 1 + + def skip(self): + self.skipped += 1 + + def report(self, current_file: str = "") -> str: + elapsed = time.monotonic() - self.start + rate = self.done / elapsed if elapsed > 0 else 0 + eta_str = "" + if self.total and rate > 0: + remaining = (self.total - self.done - self.skipped) / rate + h, r = divmod(int(remaining), 3600) + m, s = divmod(r, 60) + eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}" + + total_str = f"/{self.total}" if self.total else "" + pct = f" ({100*(self.done+self.skipped)/self.total:.1f}%)" if self.total else "" + name = Path(current_file).name[:40] if current_file else "" + return ( + f"\r {self.done+self.skipped}{total_str}{pct}" + f" ok={self.done} err={self.errors} skip={self.skipped}" + f" {rate:.1f} f/s{eta_str} {name:<40}" + ) + + +# --------------------------------------------------------------------------- +# Shutdown handler +# --------------------------------------------------------------------------- + +_shutdown = False + +def _handle_sigint(sig, frame): + global _shutdown + print("\n\n[!] Přerušeno uživatelem — dočišťuji...") + _shutdown = True + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="10_collect_metadata.py — Sběr metadat ze všech fotek", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--source", type=Path, default=SOURCE, + help=f"Zdrojová složka (default: {SOURCE})") + parser.add_argument("--output", type=Path, default=OUTPUT_JSONL, + help=f"Výstupní JSONL soubor (default: {OUTPUT_JSONL})") + parser.add_argument("--resume", action="store_true", + help="Přeskočit soubory, které jsou již v JSONL") + parser.add_argument("--dry-run", action="store_true", + help="Jen spočítat soubory, nic nezpracovat") + parser.add_argument("--limit", type=int, default=0, + help="Zpracovat maximálně N fotek (0 = vše)") + parser.add_argument("--workers", type=int, default=1, + help="Počet paralelních vláken (default: 1)") + args = parser.parse_args() + + source: Path = args.source + output: Path = args.output + error_log: Path = output.parent / (output.stem + "_errors.log") + + print(f"[10_collect_metadata]") + print(f" Zdroj: {source}") + print(f" Výstup: {output}") + print(f" Resume: {args.resume}") + print(f" Limit: {args.limit or 'vše'}") + print(f" Workers: {args.workers}") + print() + + if not source.exists(): + print(f"[ERROR] Zdrojová složka neexistuje: {source}") + sys.exit(1) + + # Dry run — jen spočítat + if args.dry_run: + print("Dry run — procházím a počítám...", end=" ", flush=True) + n = count_photos(source) + print(f"{n} fotek nalezeno v {source}") + return + + # Připravit výstupní adresář + output.parent.mkdir(parents=True, exist_ok=True) + + # Resume: načíst již zpracované cesty + processed = set() + if args.resume and output.exists(): + print(f" Načítám již zpracované záznamy z {output.name}...", end=" ", flush=True) + processed = load_processed_paths(output) + print(f"{len(processed)} souborů") + + # Spočítat celkový počet (pro ETA) + print(" Počítám soubory...", end=" ", flush=True) + all_files = list(iter_photos(source)) + total = len(all_files) + print(f"{total} fotek") + + if args.limit: + all_files = all_files[: args.limit] + + signal.signal(signal.SIGINT, _handle_sigint) + + progress = Progress(total=min(total, args.limit) if args.limit else total) + flush_every = 50 # zápis po N zpracovaných + + out_f = open(output, "a", encoding="utf-8", buffering=1) + err_f = open(error_log, "a", encoding="utf-8", buffering=1) + err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n") + + def process_one(path: Path) -> tuple[dict | None, bool]: + """Vrátí (record, skipped).""" + if str(path) in processed: + return None, True + try: + record = collect_photo(path, source) + return record, False + except Exception as e: + return {"file_path": str(path), "fatal_error": str(e), + "collected_at": datetime.now(tz=timezone.utc).isoformat()}, False + + print(f"\n Zpracovávám...\n") + + batch: list[str] = [] + processed_count = 0 + + if args.workers > 1: + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = {pool.submit(process_one, p): p for p in all_files} + for future in as_completed(futures): + if _shutdown: + pool.shutdown(wait=False, cancel_futures=True) + break + path = futures[future] + try: + record, skipped = future.result() + except Exception as e: + progress.tick(ok=False) + err_f.write(f"{path}\t{e}\n") + print(progress.report(str(path)), end="", flush=True) + continue + + if skipped: + progress.skip() + else: + has_error = "fatal_error" in record or "pil_error" in record + progress.tick(ok=not has_error) + if has_error: + err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n") + batch.append(json.dumps(record, ensure_ascii=False)) + if len(batch) >= flush_every: + out_f.write("\n".join(batch) + "\n") + batch.clear() + + processed_count += 1 + print(progress.report(str(path)), end="", flush=True) + else: + for path in all_files: + if _shutdown: + break + record, skipped = process_one(path) + + if skipped: + progress.skip() + else: + has_error = "fatal_error" in record or "pil_error" in record + progress.tick(ok=not has_error) + if has_error: + err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n") + batch.append(json.dumps(record, ensure_ascii=False)) + if len(batch) >= flush_every: + out_f.write("\n".join(batch) + "\n") + batch.clear() + + processed_count += 1 + print(progress.report(str(path)), end="", flush=True) + + # Flush zbytku + if batch: + out_f.write("\n".join(batch) + "\n") + + out_f.close() + err_f.close() + + elapsed = time.monotonic() - progress.start + print(f"\n\n{'='*60}") + print(f" Dokončeno za {elapsed:.1f}s") + print(f" Zpracováno: {progress.done}") + print(f" Přeskočeno: {progress.skipped}") + print(f" Chyby: {progress.errors}") + print(f" Výstup: {output}") + if progress.errors: + print(f" Chybový log: {error_log}") + print(f"{'='*60}") + + +if __name__ == "__main__": + main()