Files
fotkyBuzalkovi/10_collect_metadata.py
T
2026-05-22 06:22:32 +02:00

563 lines
20 KiB
Python

#!/usr/bin/env python3
"""
10_collect_metadata.py — Sběr metadat ze všech fotek
Pro každý soubor obrázku extrahuje:
• Informace ze souborového systému (cesta, velikost, mtime, ctime)
• SHA256 hash souboru + pixel hash (EXIF-orientation-aware)
• Perceptuální hashe (pHash, dHash) pro detekci duplikátů
• Všechny EXIF tagy přes ExifRead (primární parser)
• GPS souřadnice přepočtené na decimal degrees
• IPTC metadata (keywords, popis, autor)
• XMP metadata (incl. Apple obličeje, screenshoty)
Výstup: JSONL soubor — jeden řádek = jeden objekt = jedna fotka
Chyby: samostatný .log soubor
Použití:
python 10_collect_metadata.py
python 10_collect_metadata.py --source //tower/photosnahrani
python 10_collect_metadata.py --resume # přeskočí již zpracované soubory
python 10_collect_metadata.py --limit 100 # jen prvních 100 (pro test)
python 10_collect_metadata.py --dry-run # jen spočítá soubory, nic nezpracuje
python 10_collect_metadata.py --workers 4 # paralelní zpracování
"""
import argparse
import hashlib
import json
import os
import re
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
import exifread
import imagehash
from PIL import Image, ImageOps, IptcImagePlugin
# ---------------------------------------------------------------------------
# Konfigurace
# ---------------------------------------------------------------------------
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"}
SOURCE = Path("//tower/photosnahrani")
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_JSONL = OUTPUT_DIR / "10_metadata.jsonl"
ERROR_LOG = OUTPUT_DIR / "10_errors.log"
IPTC_TAG_NAMES = {
(2, 5): "ObjectName",
(2, 10): "Urgency",
(2, 15): "Category",
(2, 20): "SupplementalCategories",
(2, 25): "Keywords",
(2, 40): "SpecialInstructions",
(2, 55): "DateCreated",
(2, 60): "TimeCreated",
(2, 80): "Byline",
(2, 85): "BylineTitle",
(2, 90): "City",
(2, 92): "SubLocation",
(2, 95): "ProvinceState",
(2, 100): "CountryCode",
(2, 101): "CountryName",
(2, 103): "OriginalTransmissionReference",
(2, 105): "Headline",
(2, 110): "Credit",
(2, 115): "Source",
(2, 116): "Copyright",
(2, 118): "Contact",
(2, 120): "Caption",
(2, 122): "WriterEditor",
}
# ---------------------------------------------------------------------------
# GPS
# ---------------------------------------------------------------------------
def _rational_to_float(r) -> float:
if hasattr(r, "numerator") and hasattr(r, "denominator"):
return r.numerator / r.denominator if r.denominator != 0 else 0.0
return float(r)
def _dms_to_decimal(vals) -> float:
d = _rational_to_float(vals[0])
m = _rational_to_float(vals[1])
s = _rational_to_float(vals[2])
return d + m / 60.0 + s / 3600.0
def extract_gps(raw_tags: dict) -> dict:
"""Přepočítá GPS DMS z ExifRead raw tagů na decimal degrees."""
result = {}
try:
lat_tag = raw_tags.get("GPS GPSLatitude")
lat_ref = raw_tags.get("GPS GPSLatitudeRef")
lon_tag = raw_tags.get("GPS GPSLongitude")
lon_ref = raw_tags.get("GPS GPSLongitudeRef")
if lat_tag and lon_tag:
lat = _dms_to_decimal(lat_tag.values)
lon = _dms_to_decimal(lon_tag.values)
if lat_ref and str(lat_ref).strip().upper().startswith("S"):
lat = -lat
if lon_ref and str(lon_ref).strip().upper().startswith("W"):
lon = -lon
result["gps_lat"] = round(lat, 7)
result["gps_lon"] = round(lon, 7)
alt_tag = raw_tags.get("GPS GPSAltitude")
alt_ref = raw_tags.get("GPS GPSAltitudeRef")
if alt_tag and alt_tag.values:
alt = _rational_to_float(alt_tag.values[0])
# ref==1 znamená pod mořem
if alt_ref and alt_ref.values and alt_ref.values[0] == 1:
alt = -alt
result["gps_alt"] = round(alt, 2)
except Exception as e:
result["gps_error"] = str(e)
return result
# ---------------------------------------------------------------------------
# Hashe
# ---------------------------------------------------------------------------
def file_hash_sha256(path: Path, chunk: int = 65536) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while data := f.read(chunk):
h.update(data)
return h.hexdigest()
# ---------------------------------------------------------------------------
# IPTC
# ---------------------------------------------------------------------------
def _parse_iptc(raw_iptc) -> dict:
if not raw_iptc:
return {}
out = {}
for key, value in raw_iptc.items():
name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}")
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
elif isinstance(value, list):
value = [
v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v
for v in value
]
out[name] = value
return out
# ---------------------------------------------------------------------------
# XMP
# ---------------------------------------------------------------------------
XMP_PATTERNS = {
"creator_tool": r'xmp:CreatorTool="([^"]+)"',
"create_date": r'xmp:CreateDate="([^"]+)"',
"modify_date": r'xmp:ModifyDate="([^"]+)"',
"rating": r'xmp:Rating="([^"]+)"',
"label": r'xmp:Label="([^"]+)"',
"title": r'<dc:title[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"description": r'<dc:description[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"creator": r'<dc:creator[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"subject_block": r'<dc:subject[^>]*>(.*?)</dc:subject>',
}
def _parse_xmp(xmp_raw) -> dict:
if not xmp_raw:
return {}
if isinstance(xmp_raw, bytes):
xmp_raw = xmp_raw.decode("utf-8", errors="replace")
out = {}
for name, pat in XMP_PATTERNS.items():
m = re.search(pat, xmp_raw, re.DOTALL)
if m:
out[name] = m.group(1).strip()
if "subject_block" in out:
kws = re.findall(r"<rdf:li[^>]*>([^<]+)</rdf:li>", out.pop("subject_block"))
if kws:
out["keywords"] = kws
face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw))
if face_count:
out["face_regions_count"] = face_count
# Apple face names (pokud jsou pojmenované v Photos)
face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw)
if face_names:
out["face_names"] = face_names
out["_xmp_bytes"] = len(xmp_raw)
return out
# ---------------------------------------------------------------------------
# Hlavní sběr dat pro jednu fotku
# ---------------------------------------------------------------------------
def collect_photo(path: Path, base_path: Path) -> dict:
"""Vrátí dict se všemi daty o jedné fotce. Výjimky zachytí, nikdy nevyhodí."""
record: dict = {}
stat = path.stat()
# Souborový systém
record["file_path"] = str(path)
record["file_path_relative"] = str(path.relative_to(base_path)) if path.is_relative_to(base_path) else None
record["file_name"] = path.name
record["file_stem"] = path.stem
record["file_ext"] = path.suffix.lower()
record["file_size"] = stat.st_size
record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
record["mtime_ts"] = stat.st_mtime
record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat()
# SHA256 souboru (streamovaný — nepotřebuje načíst celý do RAM)
try:
record["sha256_file"] = file_hash_sha256(path)
except Exception as e:
record["sha256_file"] = None
record["sha256_file_error"] = str(e)
# ExifRead — primární parser: všechny tagy + GPS
record["exif"] = {}
try:
with open(path, "rb") as f:
raw_tags = exifread.process_file(f, details=True)
record.update(extract_gps(raw_tags))
for k, v in raw_tags.items():
if "Thumbnail" in k and "JPEGInterchangeFormat" not in k:
continue
record["exif"][k] = str(v)
except Exception as e:
record["exif_error"] = str(e)
# Pillow — jeden open pro vše: rozměry, pixel hash, perceptual hash, IPTC, XMP
try:
with Image.open(path) as img:
record["format"] = img.format
record["mode"] = img.mode
record["width"] = img.width
record["height"] = img.height
record["megapixels"] = round((img.width * img.height) / 1_000_000, 2)
record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info
dpi = img.info.get("dpi")
record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi
record["icc_profile"] = "icc_profile" in img.info
record["embedded_thumbnail"] = "thumbnail" in img.info
# IPTC
try:
record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img))
except Exception as e:
record["iptc"] = {"_error": str(e)}
# XMP
record["xmp"] = _parse_xmp(img.info.get("xmp"))
# Pixel hash + perceptuální hashe (EXIF orientation aware)
try:
img_r = ImageOps.exif_transpose(img)
if img_r.mode != "RGB":
img_r = img_r.convert("RGB")
pixels = img_r.tobytes()
record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest()
ph = imagehash.phash(img_r)
dh = imagehash.dhash(img_r)
record["phash"] = str(ph)
record["dhash"] = str(dh)
# Celočíselná forma pro DB (BIGINT, signed)
ph_int = int(str(ph), 16)
record["phash_int"] = ph_int if ph_int < 2**63 else ph_int - 2**64
except Exception as e:
record["pixel_hash_error"] = str(e)
record["sha256_pixels"] = None
record["phash"] = None
record["dhash"] = None
record["phash_int"] = None
except Exception as e:
record["pil_error"] = str(e)
record["collected_at"] = datetime.now(tz=timezone.utc).isoformat()
return record
# ---------------------------------------------------------------------------
# Procházení adresáře
# ---------------------------------------------------------------------------
def iter_photos(source: Path):
"""Generátor: rekurzivně vrací cesty k obrázkům."""
for root, dirs, files in os.walk(source):
# Skrytá adresáře ignorovat
dirs[:] = [d for d in dirs if not d.startswith(".")]
for fname in files:
if Path(fname).suffix.lower() in SUPPORTED_EXTENSIONS:
yield Path(root) / fname
def count_photos(source: Path) -> int:
return sum(1 for _ in iter_photos(source))
def load_processed_paths(jsonl_path: Path) -> set:
"""Načte sadu file_path ze stávajícího JSONL pro resume."""
processed = set()
if not jsonl_path.exists():
return processed
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
fp = obj.get("file_path")
if fp:
processed.add(fp)
except json.JSONDecodeError:
pass
return processed
# ---------------------------------------------------------------------------
# Progress
# ---------------------------------------------------------------------------
class Progress:
def __init__(self, total: int | None):
self.total = total
self.done = 0
self.errors = 0
self.skipped = 0
self.start = time.monotonic()
def tick(self, ok: bool = True):
if ok:
self.done += 1
else:
self.errors += 1
def skip(self):
self.skipped += 1
def report(self, current_file: str = "") -> str:
elapsed = time.monotonic() - self.start
rate = self.done / elapsed if elapsed > 0 else 0
eta_str = ""
if self.total and rate > 0:
remaining = (self.total - self.done - self.skipped) / rate
h, r = divmod(int(remaining), 3600)
m, s = divmod(r, 60)
eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}"
total_str = f"/{self.total}" if self.total else ""
pct = f" ({100*(self.done+self.skipped)/self.total:.1f}%)" if self.total else ""
name = Path(current_file).name[:40] if current_file else ""
return (
f"\r {self.done+self.skipped}{total_str}{pct}"
f" ok={self.done} err={self.errors} skip={self.skipped}"
f" {rate:.1f} f/s{eta_str} {name:<40}"
)
# ---------------------------------------------------------------------------
# Shutdown handler
# ---------------------------------------------------------------------------
_shutdown = False
def _handle_sigint(sig, frame):
global _shutdown
print("\n\n[!] Přerušeno uživatelem — dočišťuji...")
_shutdown = True
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="10_collect_metadata.py — Sběr metadat ze všech fotek",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--source", type=Path, default=SOURCE,
help=f"Zdrojová složka (default: {SOURCE})")
parser.add_argument("--output", type=Path, default=OUTPUT_JSONL,
help=f"Výstupní JSONL soubor (default: {OUTPUT_JSONL})")
parser.add_argument("--resume", action="store_true",
help="Přeskočit soubory, které jsou již v JSONL")
parser.add_argument("--dry-run", action="store_true",
help="Jen spočítat soubory, nic nezpracovat")
parser.add_argument("--limit", type=int, default=0,
help="Zpracovat maximálně N fotek (0 = vše)")
parser.add_argument("--workers", type=int, default=1,
help="Počet paralelních vláken (default: 1)")
args = parser.parse_args()
source: Path = args.source
output: Path = args.output
error_log: Path = output.parent / (output.stem + "_errors.log")
print(f"[10_collect_metadata]")
print(f" Zdroj: {source}")
print(f" Výstup: {output}")
print(f" Resume: {args.resume}")
print(f" Limit: {args.limit or 'vše'}")
print(f" Workers: {args.workers}")
print()
if not source.exists():
print(f"[ERROR] Zdrojová složka neexistuje: {source}")
sys.exit(1)
# Dry run — jen spočítat
if args.dry_run:
print("Dry run — procházím a počítám...", end=" ", flush=True)
n = count_photos(source)
print(f"{n} fotek nalezeno v {source}")
return
# Připravit výstupní adresář
output.parent.mkdir(parents=True, exist_ok=True)
# Resume: načíst již zpracované cesty
processed = set()
if args.resume and output.exists():
print(f" Načítám již zpracované záznamy z {output.name}...", end=" ", flush=True)
processed = load_processed_paths(output)
print(f"{len(processed)} souborů")
# Spočítat celkový počet (pro ETA)
print(" Počítám soubory...", end=" ", flush=True)
all_files = list(iter_photos(source))
total = len(all_files)
print(f"{total} fotek")
if args.limit:
all_files = all_files[: args.limit]
signal.signal(signal.SIGINT, _handle_sigint)
progress = Progress(total=min(total, args.limit) if args.limit else total)
flush_every = 50 # zápis po N zpracovaných
out_f = open(output, "a", encoding="utf-8", buffering=1)
err_f = open(error_log, "a", encoding="utf-8", buffering=1)
err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n")
def process_one(path: Path) -> tuple[dict | None, bool]:
"""Vrátí (record, skipped)."""
if str(path) in processed:
return None, True
try:
record = collect_photo(path, source)
return record, False
except Exception as e:
return {"file_path": str(path), "fatal_error": str(e),
"collected_at": datetime.now(tz=timezone.utc).isoformat()}, False
print(f"\n Zpracovávám...\n")
batch: list[str] = []
processed_count = 0
if args.workers > 1:
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = {pool.submit(process_one, p): p for p in all_files}
for future in as_completed(futures):
if _shutdown:
pool.shutdown(wait=False, cancel_futures=True)
break
path = futures[future]
try:
record, skipped = future.result()
except Exception as e:
progress.tick(ok=False)
err_f.write(f"{path}\t{e}\n")
print(progress.report(str(path)), end="", flush=True)
continue
if skipped:
progress.skip()
else:
has_error = "fatal_error" in record or "pil_error" in record
progress.tick(ok=not has_error)
if has_error:
err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n")
batch.append(json.dumps(record, ensure_ascii=False))
if len(batch) >= flush_every:
out_f.write("\n".join(batch) + "\n")
batch.clear()
processed_count += 1
print(progress.report(str(path)), end="", flush=True)
else:
for path in all_files:
if _shutdown:
break
record, skipped = process_one(path)
if skipped:
progress.skip()
else:
has_error = "fatal_error" in record or "pil_error" in record
progress.tick(ok=not has_error)
if has_error:
err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n")
batch.append(json.dumps(record, ensure_ascii=False))
if len(batch) >= flush_every:
out_f.write("\n".join(batch) + "\n")
batch.clear()
processed_count += 1
print(progress.report(str(path)), end="", flush=True)
# Flush zbytku
if batch:
out_f.write("\n".join(batch) + "\n")
out_f.close()
err_f.close()
elapsed = time.monotonic() - progress.start
print(f"\n\n{'='*60}")
print(f" Dokončeno za {elapsed:.1f}s")
print(f" Zpracováno: {progress.done}")
print(f" Přeskočeno: {progress.skipped}")
print(f" Chyby: {progress.errors}")
print(f" Výstup: {output}")
if progress.errors:
print(f" Chybový log: {error_log}")
print(f"{'='*60}")
if __name__ == "__main__":
main()