#!/usr/bin/env python3 """ 10_collect_metadata.py — Sběr metadat ze všech fotek Pro každý soubor obrázku extrahuje: • Informace ze souborového systému (cesta, velikost, mtime, ctime) • SHA256 hash souboru + pixel hash (EXIF-orientation-aware) • Perceptuální hashe (pHash, dHash) pro detekci duplikátů • Všechny EXIF tagy přes ExifRead (primární parser) • GPS souřadnice přepočtené na decimal degrees • IPTC metadata (keywords, popis, autor) • XMP metadata (incl. Apple obličeje, screenshoty) Výstup: JSONL soubor — jeden řádek = jeden objekt = jedna fotka Chyby: samostatný .log soubor Použití: python 10_collect_metadata.py python 10_collect_metadata.py --source //tower/photosnahrani python 10_collect_metadata.py --resume # přeskočí již zpracované soubory python 10_collect_metadata.py --limit 100 # jen prvních 100 (pro test) python 10_collect_metadata.py --dry-run # jen spočítá soubory, nic nezpracuje python 10_collect_metadata.py --workers 4 # paralelní zpracování """ import argparse import hashlib import json import os import re import signal import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8": sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") import logging import exifread logging.getLogger("exifread").setLevel(logging.CRITICAL) import imagehash from PIL import Image, ImageOps, IptcImagePlugin # --------------------------------------------------------------------------- # Konfigurace # --------------------------------------------------------------------------- SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"} SOURCE = Path("//tower/photosnahrani") OUTPUT_DIR = Path(__file__).parent / "output" OUTPUT_JSONL = OUTPUT_DIR / "10_metadata.jsonl" ERROR_LOG = OUTPUT_DIR / "10_errors.log" RESUME = True WORKERS = 2 IPTC_TAG_NAMES = { (2, 5): "ObjectName", (2, 10): "Urgency", (2, 15): "Category", (2, 20): "SupplementalCategories", (2, 25): "Keywords", (2, 40): "SpecialInstructions", (2, 55): "DateCreated", (2, 60): "TimeCreated", (2, 80): "Byline", (2, 85): "BylineTitle", (2, 90): "City", (2, 92): "SubLocation", (2, 95): "ProvinceState", (2, 100): "CountryCode", (2, 101): "CountryName", (2, 103): "OriginalTransmissionReference", (2, 105): "Headline", (2, 110): "Credit", (2, 115): "Source", (2, 116): "Copyright", (2, 118): "Contact", (2, 120): "Caption", (2, 122): "WriterEditor", } # --------------------------------------------------------------------------- # GPS # --------------------------------------------------------------------------- def _rational_to_float(r) -> float: if hasattr(r, "numerator") and hasattr(r, "denominator"): return r.numerator / r.denominator if r.denominator != 0 else 0.0 return float(r) def _dms_to_decimal(vals) -> float: d = _rational_to_float(vals[0]) m = _rational_to_float(vals[1]) s = _rational_to_float(vals[2]) return d + m / 60.0 + s / 3600.0 def extract_gps(raw_tags: dict) -> dict: """Přepočítá GPS DMS z ExifRead raw tagů na decimal degrees.""" result = {} try: lat_tag = raw_tags.get("GPS GPSLatitude") lat_ref = raw_tags.get("GPS GPSLatitudeRef") lon_tag = raw_tags.get("GPS GPSLongitude") lon_ref = raw_tags.get("GPS GPSLongitudeRef") if lat_tag and lon_tag: lat = _dms_to_decimal(lat_tag.values) lon = _dms_to_decimal(lon_tag.values) if lat_ref and str(lat_ref).strip().upper().startswith("S"): lat = -lat if lon_ref and str(lon_ref).strip().upper().startswith("W"): lon = -lon result["gps_lat"] = round(lat, 7) result["gps_lon"] = round(lon, 7) alt_tag = raw_tags.get("GPS GPSAltitude") alt_ref = raw_tags.get("GPS GPSAltitudeRef") if alt_tag and alt_tag.values: alt = _rational_to_float(alt_tag.values[0]) # ref==1 znamená pod mořem if alt_ref and alt_ref.values and alt_ref.values[0] == 1: alt = -alt result["gps_alt"] = round(alt, 2) except Exception as e: result["gps_error"] = str(e) return result # --------------------------------------------------------------------------- # JSON serializace # --------------------------------------------------------------------------- def _make_serializable(obj): """Rekurzivně převede vše co JSON nezná (IFDRational, bytes, tuple…) na základní typy.""" if hasattr(obj, "numerator") and hasattr(obj, "denominator"): try: return float(obj) except Exception: return str(obj) if isinstance(obj, dict): return {str(k): _make_serializable(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_make_serializable(x) for x in obj] if isinstance(obj, bytes): return obj[:200].decode("utf-8", errors="replace") try: json.dumps(obj) return obj except (TypeError, ValueError): return str(obj) # --------------------------------------------------------------------------- # Hashe # --------------------------------------------------------------------------- def file_hash_sha256(path: Path, chunk: int = 65536) -> str: h = hashlib.sha256() with open(path, "rb") as f: while data := f.read(chunk): h.update(data) return h.hexdigest() # --------------------------------------------------------------------------- # IPTC # --------------------------------------------------------------------------- def _parse_iptc(raw_iptc) -> dict: if not raw_iptc: return {} out = {} for key, value in raw_iptc.items(): name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}") if isinstance(value, bytes): value = value.decode("utf-8", errors="replace") elif isinstance(value, list): value = [ v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v for v in value ] out[name] = value return out # --------------------------------------------------------------------------- # XMP # --------------------------------------------------------------------------- XMP_PATTERNS = { "creator_tool": r'xmp:CreatorTool="([^"]+)"', "create_date": r'xmp:CreateDate="([^"]+)"', "modify_date": r'xmp:ModifyDate="([^"]+)"', "rating": r'xmp:Rating="([^"]+)"', "label": r'xmp:Label="([^"]+)"', "title": r']*>.*?]*>([^<]+)', "description": r']*>.*?]*>([^<]+)', "creator": r']*>.*?]*>([^<]+)', "subject_block": r']*>(.*?)', } def _parse_xmp(xmp_raw) -> dict: if not xmp_raw: return {} if isinstance(xmp_raw, bytes): xmp_raw = xmp_raw.decode("utf-8", errors="replace") out = {} for name, pat in XMP_PATTERNS.items(): m = re.search(pat, xmp_raw, re.DOTALL) if m: out[name] = m.group(1).strip() if "subject_block" in out: kws = re.findall(r"]*>([^<]+)", out.pop("subject_block")) if kws: out["keywords"] = kws face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw)) if face_count: out["face_regions_count"] = face_count # Apple face names (pokud jsou pojmenované v Photos) face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw) if face_names: out["face_names"] = face_names out["_xmp_bytes"] = len(xmp_raw) return out # --------------------------------------------------------------------------- # Hlavní sběr dat pro jednu fotku # --------------------------------------------------------------------------- def collect_photo(path: Path, base_path: Path) -> dict: """Vrátí dict se všemi daty o jedné fotce. Výjimky zachytí, nikdy nevyhodí.""" record: dict = {} stat = path.stat() # Souborový systém record["file_path"] = str(path) record["file_path_relative"] = str(path.relative_to(base_path)) if path.is_relative_to(base_path) else None record["file_name"] = path.name record["file_stem"] = path.stem record["file_ext"] = path.suffix.lower() record["file_size"] = stat.st_size record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() record["mtime_ts"] = stat.st_mtime record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat() # SHA256 souboru (streamovaný — nepotřebuje načíst celý do RAM) try: record["sha256_file"] = file_hash_sha256(path) except Exception as e: record["sha256_file"] = None record["sha256_file_error"] = str(e) # ExifRead — primární parser: všechny tagy + GPS record["exif"] = {} try: with open(path, "rb") as f: raw_tags = exifread.process_file(f, details=True) record.update(extract_gps(raw_tags)) for k, v in raw_tags.items(): if "Thumbnail" in k and "JPEGInterchangeFormat" not in k: continue record["exif"][k] = str(v) except Exception as e: record["exif_error"] = str(e) # Pillow — jeden open pro vše: rozměry, pixel hash, perceptual hash, IPTC, XMP try: with Image.open(path) as img: record["format"] = img.format record["mode"] = img.mode record["width"] = img.width record["height"] = img.height record["megapixels"] = round((img.width * img.height) / 1_000_000, 2) record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info dpi = img.info.get("dpi") record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi record["icc_profile"] = "icc_profile" in img.info record["embedded_thumbnail"] = "thumbnail" in img.info # IPTC try: record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img)) except Exception as e: record["iptc"] = {"_error": str(e)} # XMP record["xmp"] = _parse_xmp(img.info.get("xmp")) # Pixel hash + perceptuální hashe (EXIF orientation aware) try: img_r = ImageOps.exif_transpose(img) if img_r.mode != "RGB": img_r = img_r.convert("RGB") pixels = img_r.tobytes() record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest() ph = imagehash.phash(img_r) dh = imagehash.dhash(img_r) record["phash"] = str(ph) record["dhash"] = str(dh) # Celočíselná forma pro DB (BIGINT, signed) ph_int = int(str(ph), 16) record["phash_int"] = ph_int if ph_int < 2**63 else ph_int - 2**64 except Exception as e: record["pixel_hash_error"] = str(e) record["sha256_pixels"] = None record["phash"] = None record["dhash"] = None record["phash_int"] = None except Exception as e: record["pil_error"] = str(e) record["collected_at"] = datetime.now(tz=timezone.utc).isoformat() return record # --------------------------------------------------------------------------- # Procházení adresáře # --------------------------------------------------------------------------- def iter_photos(source: Path): """Generátor: rekurzivně vrací cesty k obrázkům.""" for root, dirs, files in os.walk(source): # Skrytá adresáře ignorovat dirs[:] = [d for d in dirs if not d.startswith(".")] for fname in files: if Path(fname).suffix.lower() in SUPPORTED_EXTENSIONS: p = Path(root) / fname # Přeskočit symlinky které vedou mimo share (WinError 3 - \\mnt\user\...) try: p.stat() except OSError: continue yield p def count_photos(source: Path) -> int: return sum(1 for _ in iter_photos(source)) def load_processed_paths(jsonl_path: Path) -> set: """Načte sadu file_path ze stávajícího JSONL pro resume.""" processed = set() if not jsonl_path.exists(): return processed with open(jsonl_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) fp = obj.get("file_path") if fp: processed.add(fp) except json.JSONDecodeError: pass return processed # --------------------------------------------------------------------------- # Progress # --------------------------------------------------------------------------- class Progress: def __init__(self, total: int | None): self.total = total self.done = 0 self.errors = 0 self.skipped = 0 self.start = time.monotonic() def tick(self, ok: bool = True): if ok: self.done += 1 else: self.errors += 1 def skip(self): self.skipped += 1 def report(self, current_file: str = "") -> str: elapsed = time.monotonic() - self.start rate = self.done / elapsed if elapsed > 0 else 0 eta_str = "" if self.total and rate > 0: remaining = (self.total - self.done - self.skipped) / rate h, r = divmod(int(remaining), 3600) m, s = divmod(r, 60) eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}" total_str = f"/{self.total}" if self.total else "" pct = f" ({100*(self.done+self.skipped)/self.total:.1f}%)" if self.total else "" name = Path(current_file).name[:40] if current_file else "" return ( f"\r {self.done+self.skipped}{total_str}{pct}" f" ok={self.done} err={self.errors} skip={self.skipped}" f" {rate:.1f} f/s{eta_str} {name:<40}" ) # --------------------------------------------------------------------------- # Shutdown handler # --------------------------------------------------------------------------- _shutdown = False def _handle_sigint(sig, frame): global _shutdown print("\n\n[!] Přerušeno uživatelem — dočišťuji...") _shutdown = True # --------------------------------------------------------------------------- # main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="10_collect_metadata.py — Sběr metadat ze všech fotek", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--source", type=Path, default=SOURCE, help=f"Zdrojová složka (default: {SOURCE})") parser.add_argument("--output", type=Path, default=OUTPUT_JSONL, help=f"Výstupní JSONL soubor (default: {OUTPUT_JSONL})") parser.add_argument("--resume", action="store_true", default=RESUME, help=f"Přeskočit soubory, které jsou již v JSONL (default: {RESUME})") parser.add_argument("--dry-run", action="store_true", help="Jen spočítat soubory, nic nezpracovat") parser.add_argument("--limit", type=int, default=0, help="Zpracovat maximálně N fotek (0 = vše)") parser.add_argument("--workers", type=int, default=WORKERS, help=f"Počet paralelních vláken (default: {WORKERS})") args = parser.parse_args() source: Path = args.source output: Path = args.output error_log: Path = output.parent / (output.stem + "_errors.log") print(f"[10_collect_metadata]") print(f" Zdroj: {source}") print(f" Výstup: {output}") print(f" Resume: {args.resume}") print(f" Limit: {args.limit or 'vše'}") print(f" Workers: {args.workers}") print() if not source.exists(): print(f"[ERROR] Zdrojová složka neexistuje: {source}") sys.exit(1) # Dry run — jen spočítat if args.dry_run: print("Dry run — procházím a počítám...", end=" ", flush=True) n = count_photos(source) print(f"{n} fotek nalezeno v {source}") return # Připravit výstupní adresář output.parent.mkdir(parents=True, exist_ok=True) # Resume: načíst již zpracované cesty processed = set() if args.resume and output.exists(): print(f" Načítám již zpracované záznamy z {output.name}...", end=" ", flush=True) processed = load_processed_paths(output) print(f"{len(processed)} souborů") # Spočítat celkový počet (pro ETA) print(" Počítám soubory...", end=" ", flush=True) all_files = list(iter_photos(source)) total = len(all_files) print(f"{total} fotek") if args.limit: all_files = all_files[: args.limit] signal.signal(signal.SIGINT, _handle_sigint) progress = Progress(total=min(total, args.limit) if args.limit else total) flush_every = 50 # zápis po N zpracovaných out_f = open(output, "a", encoding="utf-8", buffering=1) err_f = open(error_log, "a", encoding="utf-8", buffering=1) err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n") def process_one(path: Path) -> tuple[dict | None, bool]: """Vrátí (record, skipped).""" if str(path) in processed: return None, True try: record = collect_photo(path, source) return record, False except Exception as e: return {"file_path": str(path), "fatal_error": str(e), "collected_at": datetime.now(tz=timezone.utc).isoformat()}, False print(f"\n Zpracovávám...\n") batch: list[str] = [] processed_count = 0 if args.workers > 1: with ThreadPoolExecutor(max_workers=args.workers) as pool: futures = {pool.submit(process_one, p): p for p in all_files} for future in as_completed(futures): if _shutdown: pool.shutdown(wait=False, cancel_futures=True) break path = futures[future] try: record, skipped = future.result() except Exception as e: progress.tick(ok=False) err_f.write(f"{path}\t{e}\n") print(progress.report(str(path)), end="", flush=True) continue if skipped: progress.skip() else: has_error = "fatal_error" in record or "pil_error" in record progress.tick(ok=not has_error) if has_error: err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n") batch.append(json.dumps(_make_serializable(record), ensure_ascii=False)) if len(batch) >= flush_every: out_f.write("\n".join(batch) + "\n") batch.clear() processed_count += 1 print(progress.report(str(path)), end="", flush=True) else: for path in all_files: if _shutdown: break record, skipped = process_one(path) if skipped: progress.skip() else: has_error = "fatal_error" in record or "pil_error" in record progress.tick(ok=not has_error) if has_error: err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n") batch.append(json.dumps(_make_serializable(record), ensure_ascii=False)) if len(batch) >= flush_every: out_f.write("\n".join(batch) + "\n") batch.clear() processed_count += 1 print(progress.report(str(path)), end="", flush=True) # Flush zbytku if batch: out_f.write("\n".join(batch) + "\n") out_f.close() err_f.close() elapsed = time.monotonic() - progress.start print(f"\n\n{'='*60}") print(f" Dokončeno za {elapsed:.1f}s") print(f" Zpracováno: {progress.done}") print(f" Přeskočeno: {progress.skipped}") print(f" Chyby: {progress.errors}") print(f" Výstup: {output}") if progress.errors: print(f" Chybový log: {error_log}") print(f"{'='*60}") if __name__ == "__main__": main()