notebookVb

This commit is contained in:
administrator
2026-05-24 07:59:25 +02:00
parent 662c890257
commit 7e05384c1f
10 changed files with 87 additions and 1261 deletions
+591
View File
@@ -0,0 +1,591 @@
#!/usr/bin/env python3
"""
10_collect_metadata.py — Sběr metadat ze všech fotek
Pro každý soubor obrázku extrahuje:
• Informace ze souborového systému (cesta, velikost, mtime, ctime)
• SHA256 hash souboru + pixel hash (EXIF-orientation-aware)
• Perceptuální hashe (pHash, dHash) pro detekci duplikátů
• Všechny EXIF tagy přes ExifRead (primární parser)
• GPS souřadnice přepočtené na decimal degrees
• IPTC metadata (keywords, popis, autor)
• XMP metadata (incl. Apple obličeje, screenshoty)
Výstup: JSONL soubor — jeden řádek = jeden objekt = jedna fotka
Chyby: samostatný .log soubor
Použití:
python 10_collect_metadata.py
python 10_collect_metadata.py --source //tower/photosnahrani
python 10_collect_metadata.py --resume # přeskočí již zpracované soubory
python 10_collect_metadata.py --limit 100 # jen prvních 100 (pro test)
python 10_collect_metadata.py --dry-run # jen spočítá soubory, nic nezpracuje
python 10_collect_metadata.py --workers 4 # paralelní zpracování
"""
import argparse
import hashlib
import json
import os
import re
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
import logging
import exifread
logging.getLogger("exifread").setLevel(logging.CRITICAL)
import imagehash
from PIL import Image, ImageOps, IptcImagePlugin
# ---------------------------------------------------------------------------
# Konfigurace
# ---------------------------------------------------------------------------
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"}
SOURCE = Path("//tower/photosnahrani")
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_JSONL = OUTPUT_DIR / "10_metadata.jsonl"
ERROR_LOG = OUTPUT_DIR / "10_errors.log"
RESUME = True
WORKERS = 2
IPTC_TAG_NAMES = {
(2, 5): "ObjectName",
(2, 10): "Urgency",
(2, 15): "Category",
(2, 20): "SupplementalCategories",
(2, 25): "Keywords",
(2, 40): "SpecialInstructions",
(2, 55): "DateCreated",
(2, 60): "TimeCreated",
(2, 80): "Byline",
(2, 85): "BylineTitle",
(2, 90): "City",
(2, 92): "SubLocation",
(2, 95): "ProvinceState",
(2, 100): "CountryCode",
(2, 101): "CountryName",
(2, 103): "OriginalTransmissionReference",
(2, 105): "Headline",
(2, 110): "Credit",
(2, 115): "Source",
(2, 116): "Copyright",
(2, 118): "Contact",
(2, 120): "Caption",
(2, 122): "WriterEditor",
}
# ---------------------------------------------------------------------------
# GPS
# ---------------------------------------------------------------------------
def _rational_to_float(r) -> float:
if hasattr(r, "numerator") and hasattr(r, "denominator"):
return r.numerator / r.denominator if r.denominator != 0 else 0.0
return float(r)
def _dms_to_decimal(vals) -> float:
d = _rational_to_float(vals[0])
m = _rational_to_float(vals[1])
s = _rational_to_float(vals[2])
return d + m / 60.0 + s / 3600.0
def extract_gps(raw_tags: dict) -> dict:
"""Přepočítá GPS DMS z ExifRead raw tagů na decimal degrees."""
result = {}
try:
lat_tag = raw_tags.get("GPS GPSLatitude")
lat_ref = raw_tags.get("GPS GPSLatitudeRef")
lon_tag = raw_tags.get("GPS GPSLongitude")
lon_ref = raw_tags.get("GPS GPSLongitudeRef")
if lat_tag and lon_tag:
lat = _dms_to_decimal(lat_tag.values)
lon = _dms_to_decimal(lon_tag.values)
if lat_ref and str(lat_ref).strip().upper().startswith("S"):
lat = -lat
if lon_ref and str(lon_ref).strip().upper().startswith("W"):
lon = -lon
result["gps_lat"] = round(lat, 7)
result["gps_lon"] = round(lon, 7)
alt_tag = raw_tags.get("GPS GPSAltitude")
alt_ref = raw_tags.get("GPS GPSAltitudeRef")
if alt_tag and alt_tag.values:
alt = _rational_to_float(alt_tag.values[0])
# ref==1 znamená pod mořem
if alt_ref and alt_ref.values and alt_ref.values[0] == 1:
alt = -alt
result["gps_alt"] = round(alt, 2)
except Exception as e:
result["gps_error"] = str(e)
return result
# ---------------------------------------------------------------------------
# JSON serializace
# ---------------------------------------------------------------------------
def _make_serializable(obj):
"""Rekurzivně převede vše co JSON nezná (IFDRational, bytes, tuple…) na základní typy."""
if hasattr(obj, "numerator") and hasattr(obj, "denominator"):
try:
return float(obj)
except Exception:
return str(obj)
if isinstance(obj, dict):
return {str(k): _make_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_make_serializable(x) for x in obj]
if isinstance(obj, bytes):
return obj[:200].decode("utf-8", errors="replace")
try:
json.dumps(obj)
return obj
except (TypeError, ValueError):
return str(obj)
# ---------------------------------------------------------------------------
# Hashe
# ---------------------------------------------------------------------------
def file_hash_sha256(path: Path, chunk: int = 65536) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while data := f.read(chunk):
h.update(data)
return h.hexdigest()
# ---------------------------------------------------------------------------
# IPTC
# ---------------------------------------------------------------------------
def _parse_iptc(raw_iptc) -> dict:
if not raw_iptc:
return {}
out = {}
for key, value in raw_iptc.items():
name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}")
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
elif isinstance(value, list):
value = [
v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v
for v in value
]
out[name] = value
return out
# ---------------------------------------------------------------------------
# XMP
# ---------------------------------------------------------------------------
XMP_PATTERNS = {
"creator_tool": r'xmp:CreatorTool="([^"]+)"',
"create_date": r'xmp:CreateDate="([^"]+)"',
"modify_date": r'xmp:ModifyDate="([^"]+)"',
"rating": r'xmp:Rating="([^"]+)"',
"label": r'xmp:Label="([^"]+)"',
"title": r'<dc:title[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"description": r'<dc:description[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"creator": r'<dc:creator[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"subject_block": r'<dc:subject[^>]*>(.*?)</dc:subject>',
}
def _parse_xmp(xmp_raw) -> dict:
if not xmp_raw:
return {}
if isinstance(xmp_raw, bytes):
xmp_raw = xmp_raw.decode("utf-8", errors="replace")
out = {}
for name, pat in XMP_PATTERNS.items():
m = re.search(pat, xmp_raw, re.DOTALL)
if m:
out[name] = m.group(1).strip()
if "subject_block" in out:
kws = re.findall(r"<rdf:li[^>]*>([^<]+)</rdf:li>", out.pop("subject_block"))
if kws:
out["keywords"] = kws
face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw))
if face_count:
out["face_regions_count"] = face_count
# Apple face names (pokud jsou pojmenované v Photos)
face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw)
if face_names:
out["face_names"] = face_names
out["_xmp_bytes"] = len(xmp_raw)
return out
# ---------------------------------------------------------------------------
# Hlavní sběr dat pro jednu fotku
# ---------------------------------------------------------------------------
def collect_photo(path: Path, base_path: Path) -> dict:
"""Vrátí dict se všemi daty o jedné fotce. Výjimky zachytí, nikdy nevyhodí."""
record: dict = {}
stat = path.stat()
# Souborový systém
record["file_path"] = str(path)
record["file_path_relative"] = str(path.relative_to(base_path)) if path.is_relative_to(base_path) else None
record["file_name"] = path.name
record["file_stem"] = path.stem
record["file_ext"] = path.suffix.lower()
record["file_size"] = stat.st_size
record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
record["mtime_ts"] = stat.st_mtime
record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat()
# SHA256 souboru (streamovaný — nepotřebuje načíst celý do RAM)
try:
record["sha256_file"] = file_hash_sha256(path)
except Exception as e:
record["sha256_file"] = None
record["sha256_file_error"] = str(e)
# ExifRead — primární parser: všechny tagy + GPS
record["exif"] = {}
try:
with open(path, "rb") as f:
raw_tags = exifread.process_file(f, details=True)
record.update(extract_gps(raw_tags))
for k, v in raw_tags.items():
if "Thumbnail" in k and "JPEGInterchangeFormat" not in k:
continue
record["exif"][k] = str(v)
except Exception as e:
record["exif_error"] = str(e)
# Pillow — jeden open pro vše: rozměry, pixel hash, perceptual hash, IPTC, XMP
try:
with Image.open(path) as img:
record["format"] = img.format
record["mode"] = img.mode
record["width"] = img.width
record["height"] = img.height
record["megapixels"] = round((img.width * img.height) / 1_000_000, 2)
record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info
dpi = img.info.get("dpi")
record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi
record["icc_profile"] = "icc_profile" in img.info
record["embedded_thumbnail"] = "thumbnail" in img.info
# IPTC
try:
record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img))
except Exception as e:
record["iptc"] = {"_error": str(e)}
# XMP
record["xmp"] = _parse_xmp(img.info.get("xmp"))
# Pixel hash + perceptuální hashe (EXIF orientation aware)
try:
img_r = ImageOps.exif_transpose(img)
if img_r.mode != "RGB":
img_r = img_r.convert("RGB")
pixels = img_r.tobytes()
record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest()
ph = imagehash.phash(img_r)
dh = imagehash.dhash(img_r)
record["phash"] = str(ph)
record["dhash"] = str(dh)
# Celočíselná forma pro DB (BIGINT, signed)
ph_int = int(str(ph), 16)
record["phash_int"] = ph_int if ph_int < 2**63 else ph_int - 2**64
except Exception as e:
record["pixel_hash_error"] = str(e)
record["sha256_pixels"] = None
record["phash"] = None
record["dhash"] = None
record["phash_int"] = None
except Exception as e:
record["pil_error"] = str(e)
record["collected_at"] = datetime.now(tz=timezone.utc).isoformat()
return record
# ---------------------------------------------------------------------------
# Procházení adresáře
# ---------------------------------------------------------------------------
def iter_photos(source: Path):
"""Generátor: rekurzivně vrací cesty k obrázkům."""
for root, dirs, files in os.walk(source):
# Skrytá adresáře ignorovat
dirs[:] = [d for d in dirs if not d.startswith(".")]
for fname in files:
if Path(fname).suffix.lower() in SUPPORTED_EXTENSIONS:
yield Path(root) / fname
def count_photos(source: Path) -> int:
return sum(1 for _ in iter_photos(source))
def load_processed_paths(jsonl_path: Path) -> set:
"""Načte sadu file_path ze stávajícího JSONL pro resume."""
processed = set()
if not jsonl_path.exists():
return processed
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
fp = obj.get("file_path")
if fp:
processed.add(fp)
except json.JSONDecodeError:
pass
return processed
# ---------------------------------------------------------------------------
# Progress
# ---------------------------------------------------------------------------
class Progress:
def __init__(self, total: int | None):
self.total = total
self.done = 0
self.errors = 0
self.skipped = 0
self.start = time.monotonic()
def tick(self, ok: bool = True):
if ok:
self.done += 1
else:
self.errors += 1
def skip(self):
self.skipped += 1
def report(self, current_file: str = "") -> str:
elapsed = time.monotonic() - self.start
rate = self.done / elapsed if elapsed > 0 else 0
eta_str = ""
if self.total and rate > 0:
remaining = (self.total - self.done - self.skipped) / rate
h, r = divmod(int(remaining), 3600)
m, s = divmod(r, 60)
eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}"
total_str = f"/{self.total}" if self.total else ""
pct = f" ({100*(self.done+self.skipped)/self.total:.1f}%)" if self.total else ""
name = Path(current_file).name[:40] if current_file else ""
return (
f"\r {self.done+self.skipped}{total_str}{pct}"
f" ok={self.done} err={self.errors} skip={self.skipped}"
f" {rate:.1f} f/s{eta_str} {name:<40}"
)
# ---------------------------------------------------------------------------
# Shutdown handler
# ---------------------------------------------------------------------------
_shutdown = False
def _handle_sigint(sig, frame):
global _shutdown
print("\n\n[!] Přerušeno uživatelem — dočišťuji...")
_shutdown = True
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="10_collect_metadata.py — Sběr metadat ze všech fotek",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--source", type=Path, default=SOURCE,
help=f"Zdrojová složka (default: {SOURCE})")
parser.add_argument("--output", type=Path, default=OUTPUT_JSONL,
help=f"Výstupní JSONL soubor (default: {OUTPUT_JSONL})")
parser.add_argument("--resume", action="store_true", default=RESUME,
help=f"Přeskočit soubory, které jsou již v JSONL (default: {RESUME})")
parser.add_argument("--dry-run", action="store_true",
help="Jen spočítat soubory, nic nezpracovat")
parser.add_argument("--limit", type=int, default=0,
help="Zpracovat maximálně N fotek (0 = vše)")
parser.add_argument("--workers", type=int, default=WORKERS,
help=f"Počet paralelních vláken (default: {WORKERS})")
args = parser.parse_args()
source: Path = args.source
output: Path = args.output
error_log: Path = output.parent / (output.stem + "_errors.log")
print(f"[10_collect_metadata]")
print(f" Zdroj: {source}")
print(f" Výstup: {output}")
print(f" Resume: {args.resume}")
print(f" Limit: {args.limit or 'vše'}")
print(f" Workers: {args.workers}")
print()
if not source.exists():
print(f"[ERROR] Zdrojová složka neexistuje: {source}")
sys.exit(1)
# Dry run — jen spočítat
if args.dry_run:
print("Dry run — procházím a počítám...", end=" ", flush=True)
n = count_photos(source)
print(f"{n} fotek nalezeno v {source}")
return
# Připravit výstupní adresář
output.parent.mkdir(parents=True, exist_ok=True)
# Resume: načíst již zpracované cesty
processed = set()
if args.resume and output.exists():
print(f" Načítám již zpracované záznamy z {output.name}...", end=" ", flush=True)
processed = load_processed_paths(output)
print(f"{len(processed)} souborů")
# Spočítat celkový počet (pro ETA)
print(" Počítám soubory...", end=" ", flush=True)
all_files = list(iter_photos(source))
total = len(all_files)
print(f"{total} fotek")
if args.limit:
all_files = all_files[: args.limit]
signal.signal(signal.SIGINT, _handle_sigint)
progress = Progress(total=min(total, args.limit) if args.limit else total)
flush_every = 50 # zápis po N zpracovaných
out_f = open(output, "a", encoding="utf-8", buffering=1)
err_f = open(error_log, "a", encoding="utf-8", buffering=1)
err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n")
def process_one(path: Path) -> tuple[dict | None, bool]:
"""Vrátí (record, skipped)."""
if str(path) in processed:
return None, True
try:
record = collect_photo(path, source)
return record, False
except Exception as e:
return {"file_path": str(path), "fatal_error": str(e),
"collected_at": datetime.now(tz=timezone.utc).isoformat()}, False
print(f"\n Zpracovávám...\n")
batch: list[str] = []
processed_count = 0
if args.workers > 1:
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = {pool.submit(process_one, p): p for p in all_files}
for future in as_completed(futures):
if _shutdown:
pool.shutdown(wait=False, cancel_futures=True)
break
path = futures[future]
try:
record, skipped = future.result()
except Exception as e:
progress.tick(ok=False)
err_f.write(f"{path}\t{e}\n")
print(progress.report(str(path)), end="", flush=True)
continue
if skipped:
progress.skip()
else:
has_error = "fatal_error" in record or "pil_error" in record
progress.tick(ok=not has_error)
if has_error:
err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n")
batch.append(json.dumps(_make_serializable(record), ensure_ascii=False))
if len(batch) >= flush_every:
out_f.write("\n".join(batch) + "\n")
batch.clear()
processed_count += 1
print(progress.report(str(path)), end="", flush=True)
else:
for path in all_files:
if _shutdown:
break
record, skipped = process_one(path)
if skipped:
progress.skip()
else:
has_error = "fatal_error" in record or "pil_error" in record
progress.tick(ok=not has_error)
if has_error:
err_f.write(f"{path}\t{record.get('fatal_error') or record.get('pil_error')}\n")
batch.append(json.dumps(_make_serializable(record), ensure_ascii=False))
if len(batch) >= flush_every:
out_f.write("\n".join(batch) + "\n")
batch.clear()
processed_count += 1
print(progress.report(str(path)), end="", flush=True)
# Flush zbytku
if batch:
out_f.write("\n".join(batch) + "\n")
out_f.close()
err_f.close()
elapsed = time.monotonic() - progress.start
print(f"\n\n{'='*60}")
print(f" Dokončeno za {elapsed:.1f}s")
print(f" Zpracováno: {progress.done}")
print(f" Přeskočeno: {progress.skipped}")
print(f" Chyby: {progress.errors}")
print(f" Výstup: {output}")
if progress.errors:
print(f" Chybový log: {error_log}")
print(f"{'='*60}")
if __name__ == "__main__":
main()
+89
View File
@@ -0,0 +1,89 @@
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# Pripojeni k postgres databazi
conn = psycopg2.connect(
host="192.168.1.76",
port=5432,
user="vladimir.buzalka",
password="Vlado7309208104++",
database="postgres"
)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
# Vytvoreni databaze
try:
cursor.execute("DROP DATABASE IF EXISTS fotky_buzalkovi;")
print("[OK] Stara databaze smazana")
except:
pass
cursor.execute("CREATE DATABASE fotky_buzalkovi;")
print("[OK] Databaze fotky_buzalkovi vytvorena")
conn.close()
# Pripojeni k nove databazi
conn = psycopg2.connect(
host="192.168.1.76",
port=5432,
user="vladimir.buzalka",
password="Vlado7309208104++",
database="fotky_buzalkovi"
)
cursor = conn.cursor()
# Vytvoreni tabulek
cursor.execute("""
CREATE TABLE cameras (
id SERIAL PRIMARY KEY,
model VARCHAR(255) UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
""")
cursor.execute("""
CREATE TABLE photos (
id BIGSERIAL PRIMARY KEY,
file_name VARCHAR(255) NOT NULL,
file_path VARCHAR(1000) NOT NULL,
file_hash VARCHAR(64) UNIQUE,
camera_id INT,
taken_at TIMESTAMP,
width INT,
height INT,
file_size BIGINT,
exif_data JSONB,
processing_status VARCHAR(50) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
FOREIGN KEY (camera_id) REFERENCES cameras(id),
INDEX idx_taken_at (taken_at),
INDEX idx_camera (camera_id),
INDEX idx_file_hash (file_hash)
);
""")
cursor.execute("""
CREATE TABLE photo_tags (
id BIGSERIAL PRIMARY KEY,
photo_id BIGINT NOT NULL,
tag VARCHAR(100),
FOREIGN KEY (photo_id) REFERENCES photos(id) ON DELETE CASCADE,
INDEX idx_tag (tag)
);
""")
# Vytvoreni indexu pro EXIF data
cursor.execute("CREATE INDEX idx_exif_camera ON photos USING GIN (exif_data);")
conn.commit()
print("[OK] Schéma vytvoreno:")
print(" - cameras")
print(" - photos")
print(" - photo_tags")
print(" - indexy pro EXIF a vyhledavani")
conn.close()
+398
View File
@@ -0,0 +1,398 @@
"""
Explorační skript: projde všechny fotky v demo_fotky/ a vytáhne maximum dat.
Výstup do konzole + JSON soubor pro detailní analýzu.
"""
import hashlib
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
# Windows konzole - vynutit UTF-8
if sys.stdout.encoding.lower() != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
import exifread
import imagehash
from PIL import Image, ImageOps, IptcImagePlugin
from PIL.ExifTags import TAGS, GPSTAGS
PHOTOS_DIR = Path(__file__).parent / "demo_fotky"
OUTPUT_JSON = Path(__file__).parent / "photo_exploration.json"
def file_hash_sha256(path: Path, chunk_size: int = 65536) -> str:
"""Hash celého souboru - detekce přesné kopie."""
h = hashlib.sha256()
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
h.update(chunk)
return h.hexdigest()
def pixel_hash_sha256(path: Path) -> str | None:
"""Hash dekódovaných pixelů - identita fotky nezávisle na metadatech.
Aplikuje EXIF orientation pro konzistenci."""
try:
with Image.open(path) as img:
img = ImageOps.exif_transpose(img)
if img.mode != "RGB":
img = img.convert("RGB")
return hashlib.sha256(img.tobytes()).hexdigest()
except Exception as e:
return None
def perceptual_hashes(path: Path) -> dict:
"""Perceptuální hashe - detekce vizuálně podobných fotek.
Každý hash je 64-bit, porovnává se Hamming distance."""
out = {}
try:
with Image.open(path) as img:
img = ImageOps.exif_transpose(img)
out["phash"] = str(imagehash.phash(img))
out["dhash"] = str(imagehash.dhash(img))
out["ahash"] = str(imagehash.average_hash(img))
out["whash"] = str(imagehash.whash(img))
except Exception as e:
out["_error"] = str(e)
return out
def iptc_info(path: Path) -> dict:
"""IPTC metadata - keywords, title, description, author atd."""
out = {}
# Mapování IPTC numerických tagů na čitelné názvy
iptc_names = {
(2, 5): "ObjectName", # Title
(2, 10): "Urgency",
(2, 15): "Category",
(2, 20): "SupplementalCategories",
(2, 25): "Keywords",
(2, 40): "SpecialInstructions",
(2, 55): "DateCreated",
(2, 60): "TimeCreated",
(2, 80): "Byline", # Creator/Author
(2, 85): "BylineTitle",
(2, 90): "City",
(2, 92): "SubLocation",
(2, 95): "ProvinceState",
(2, 100): "CountryCode",
(2, 101): "CountryName",
(2, 103): "OriginalTransmissionReference",
(2, 105): "Headline",
(2, 110): "Credit",
(2, 115): "Source",
(2, 116): "Copyright",
(2, 118): "Contact",
(2, 120): "Caption", # Description
(2, 122): "WriterEditor",
}
try:
with Image.open(path) as img:
raw = IptcImagePlugin.getiptcinfo(img)
if not raw:
return {}
for key, value in raw.items():
name = iptc_names.get(key, f"IPTC{key}")
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
elif isinstance(value, list):
value = [v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v for v in value]
out[name] = value
except Exception as e:
out["_error"] = str(e)
return out
def xmp_info(path: Path) -> dict:
"""XMP metadata - moderní alternativa IPTC, často keywords/rating/regions."""
out = {}
try:
with Image.open(path) as img:
xmp_raw = img.info.get("xmp")
if not xmp_raw:
return {}
if isinstance(xmp_raw, bytes):
xmp_raw = xmp_raw.decode("utf-8", errors="replace")
# Velmi jednoduchý parser - vytáhne nejčastější pole regexem
patterns = {
"creator_tool": r'xmp:CreatorTool="([^"]+)"',
"create_date": r'xmp:CreateDate="([^"]+)"',
"modify_date": r'xmp:ModifyDate="([^"]+)"',
"rating": r'xmp:Rating="([^"]+)"',
"label": r'xmp:Label="([^"]+)"',
"title": r'<dc:title[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"description": r'<dc:description[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"creator": r'<dc:creator[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"subject_keywords": r'<dc:subject[^>]*>(.*?)</dc:subject>',
}
for name, pat in patterns.items():
m = re.search(pat, xmp_raw, re.DOTALL)
if m:
out[name] = m.group(1).strip()
# Keywords z dc:subject - vytáhnout jednotlivé rdf:li
if "subject_keywords" in out:
kws = re.findall(r'<rdf:li[^>]*>([^<]+)</rdf:li>', out["subject_keywords"])
out["subject_keywords"] = kws
# Apple regions (rozpoznané obličeje s pozicí)
face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw))
if face_count:
out["face_regions_count"] = face_count
# Délka raw XMP pro představu
out["_xmp_length_bytes"] = len(xmp_raw)
except Exception as e:
out["_error"] = str(e)
return out
def filesystem_info(path: Path) -> dict:
stat = path.stat()
return {
"file_name": path.name,
"file_path": str(path),
"file_size_bytes": stat.st_size,
"file_size_mb": round(stat.st_size / 1024 / 1024, 2),
"mtime": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"ctime": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"extension": path.suffix.lower(),
}
def pillow_info(path: Path) -> dict:
info = {}
try:
with Image.open(path) as img:
info["format"] = img.format
info["mode"] = img.mode
info["width"] = img.width
info["height"] = img.height
info["megapixels"] = round((img.width * img.height) / 1_000_000, 2)
info["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info
info["dpi"] = img.info.get("dpi")
info["icc_profile_present"] = "icc_profile" in img.info
info["exif_present"] = bool(img.getexif())
# XMP (často v JPG od Adobe)
if "xmp" in img.info:
xmp_raw = img.info["xmp"]
if isinstance(xmp_raw, bytes):
xmp_raw = xmp_raw[:500].decode("utf-8", errors="ignore")
info["xmp_snippet"] = str(xmp_raw)[:500]
# Thumbnail embedded?
info["has_embedded_thumbnail"] = "thumbnail" in img.info
except Exception as e:
info["error"] = str(e)
return info
def pillow_exif(path: Path) -> dict:
"""Pillow EXIF — čitelné názvy."""
out = {}
try:
with Image.open(path) as img:
exif = img.getexif()
if not exif:
return {}
for tag_id, value in exif.items():
tag = TAGS.get(tag_id, f"Tag{tag_id}")
# GPS info jako vnořený dict
if tag == "GPSInfo":
gps = {}
for gps_tag_id, gps_value in value.items():
gps_tag = GPSTAGS.get(gps_tag_id, f"GPSTag{gps_tag_id}")
gps[gps_tag] = _serializable(gps_value)
out[tag] = gps
else:
out[tag] = _serializable(value)
except Exception as e:
out["_error"] = str(e)
return out
def exifread_tags(path: Path) -> dict:
"""ExifRead — často víc tagů než Pillow, mj. detailní MakerNote."""
out = {}
try:
with open(path, "rb") as f:
tags = exifread.process_file(f, details=True)
for k, v in tags.items():
# přeskočit binární thumbnail
if "Thumbnail" in k and "JPEGInterchangeFormat" not in k:
continue
out[k] = str(v)
except Exception as e:
out["_error"] = str(e)
return out
def _serializable(v):
"""Pillow vrací občas IFDRational, bytes apod. → převést na JSON-friendly."""
if isinstance(v, bytes):
return v[:200].decode("utf-8", errors="replace")
if isinstance(v, (tuple, list)):
return [_serializable(x) for x in v]
if isinstance(v, dict):
return {str(k): _serializable(val) for k, val in v.items()}
if hasattr(v, "numerator") and hasattr(v, "denominator"):
try:
return float(v)
except Exception:
return str(v)
try:
json.dumps(v)
return v
except (TypeError, ValueError):
return str(v)
def explore_photo(path: Path) -> dict:
return {
"filesystem": filesystem_info(path),
"hashes": {
"sha256_file": file_hash_sha256(path),
"sha256_pixels": pixel_hash_sha256(path),
**perceptual_hashes(path),
},
"pillow": pillow_info(path),
"exif_pillow": pillow_exif(path),
"exif_exifread": exifread_tags(path),
"iptc": iptc_info(path),
"xmp": xmp_info(path),
}
def hamming_distance(h1: str, h2: str) -> int:
"""Hamming distance mezi dvěma hex perceptual hashes."""
return bin(int(h1, 16) ^ int(h2, 16)).count("1")
def print_summary(photos: list[dict]) -> None:
print(f"\n{'=' * 70}")
print(f"PŘEHLED: {len(photos)} fotek")
print(f"{'=' * 70}\n")
# Které EXIF tagy existují napříč fotkami?
all_pillow_keys = set()
all_exifread_keys = set()
for p in photos:
all_pillow_keys.update(p["exif_pillow"].keys())
all_exifread_keys.update(p["exif_exifread"].keys())
print(f"Unikátní EXIF tagy (Pillow): {len(all_pillow_keys)}")
print(f"Unikátní EXIF tagy (ExifRead): {len(all_exifread_keys)}")
print()
for i, p in enumerate(photos, 1):
fs = p["filesystem"]
pi = p["pillow"]
h = p["hashes"]
er = p["exif_exifread"]
print(f"[{i}] {fs['file_name']}")
print(f" Velikost: {fs['file_size_mb']} MB ({pi.get('width')}x{pi.get('height')}, {pi.get('megapixels')} Mpx)")
print(f" Formát: {pi.get('format')} / mode={pi.get('mode')}")
print(f" sha256_file: {h['sha256_file'][:16]}...")
print(f" sha256_pixels: {(h.get('sha256_pixels') or 'N/A')[:16]}...")
print(f" phash: {h.get('phash')} (perceptual)")
print(f" EXIF tagů: ExifRead={len(er)}, Pillow={len(p['exif_pillow'])}")
print(f" IPTC polí: {len([k for k in p['iptc'] if not k.startswith('_')])}")
print(f" XMP polí: {len([k for k in p['xmp'] if not k.startswith('_')])}")
# ExifRead je spolehlivější (Pillow má GPS bug)
interesting = {
"Kamera": f"{er.get('Image Make', '')} {er.get('Image Model', '')}".strip(),
"Objektiv": er.get("EXIF LensModel"),
"Datum": er.get("EXIF DateTimeOriginal") or er.get("Image DateTime"),
"TZ offset": er.get("EXIF OffsetTimeOriginal") or er.get("EXIF OffsetTime"),
"Clona": er.get("EXIF FNumber"),
"ISO": er.get("EXIF ISOSpeedRatings"),
"Expozice": er.get("EXIF ExposureTime"),
"Ohnisko mm": er.get("EXIF FocalLength"),
"Flash": er.get("EXIF Flash"),
"GPS lat": er.get("GPS GPSLatitude"),
"GPS lon": er.get("GPS GPSLongitude"),
"Software": er.get("Image Software"),
}
for k, v in interesting.items():
if v and str(v).strip():
print(f" {k:12s}: {v}")
# IPTC / XMP — vypsat všechno, co je
if p["iptc"]:
for k, v in p["iptc"].items():
if not k.startswith("_"):
print(f" IPTC.{k:8s}: {v}")
if p["xmp"]:
for k, v in p["xmp"].items():
if not k.startswith("_"):
print(f" XMP.{k:9s}: {v}")
print()
# Tabulka perceptuálních podobností (Hamming distance phash)
print(f"{'=' * 70}")
print("PERCEPTUÁLNÍ PODOBNOST (phash Hamming distance)")
print("Hodnota 0-10 = vizuálně velmi podobné, >20 = odlišné")
print(f"{'=' * 70}")
n = len(photos)
header = " " + "".join(f" [{i+1}]" for i in range(n))
print(header)
for i in range(n):
row = f" [{i+1}] "
for j in range(n):
if i == j:
row += " -"
else:
h1 = photos[i]["hashes"].get("phash")
h2 = photos[j]["hashes"].get("phash")
if h1 and h2:
d = hamming_distance(h1, h2)
marker = "*" if d <= 10 and i != j else " "
row += f" {d:3d}{marker}"
else:
row += " N/A "
print(row)
print("\n * = vizuálně podobné fotky (možná duplikát po editaci)")
print()
def main():
if not PHOTOS_DIR.exists():
print(f"[ERROR] Složka neexistuje: {PHOTOS_DIR}")
return
files = sorted([p for p in PHOTOS_DIR.iterdir()
if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp"}])
if not files:
print(f"[WARN] Žádné fotky v {PHOTOS_DIR}")
return
print(f"Nalezeno {len(files)} fotek v {PHOTOS_DIR}\n")
photos = []
for f in files:
print(f" zpracovávám: {f.name} ...", end=" ", flush=True)
try:
photos.append(explore_photo(f))
print("OK")
except Exception as e:
print(f"FAIL: {e}")
print_summary(photos)
# Uložit do JSON pro detailní analýzu
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
json.dump(photos, f, indent=2, ensure_ascii=False, default=str)
print(f"\n[OK] Detailní data uložena: {OUTPUT_JSON}")
if __name__ == "__main__":
main()
+527
View File
@@ -0,0 +1,527 @@
#!/usr/bin/env python3
"""
Import JSONL metadat do PostgreSQL (fotky_buzalkovi).
Použití:
python import_to_db.py # output/10_metadata.jsonl
python import_to_db.py output/jiny_soubor.jsonl
Co dělá:
1. Vytvoří databázi 'fotky_buzalkovi' pokud neexistuje
2. Vytvoří tabulky photos / tags / photo_tags (IF NOT EXISTS)
3. Importuje záznamy po dávkách (ON CONFLICT DO NOTHING → opakované spuštění je bezpečné)
"""
import json
import os
import re
import struct
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
import psycopg2
import psycopg2.extras
from psycopg2.extras import execute_values
# ──────────────────────────────────────────────────────────────────────────────
# Konfigurace z .env (pokud je python-dotenv nainstalován) nebo z prostředí
# ──────────────────────────────────────────────────────────────────────────────
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # bez dotenv čteme z os.environ nebo defaults
DB_CONFIG = {
"host": os.getenv("DB_HOST", "192.168.1.76"),
"port": int(os.getenv("DB_PORT", "5432")),
"user": os.getenv("DB_USER", "vladimir.buzalka"),
"password": os.getenv("DB_PASSWORD", ""),
"dbname": os.getenv("DB_NAME", "fotky_buzalkovi"),
}
DEFAULT_JSONL = Path(__file__).parent / "output" / "10_metadata.jsonl"
BATCH_SIZE = 500
# ──────────────────────────────────────────────────────────────────────────────
# Schema
# ──────────────────────────────────────────────────────────────────────────────
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS photos (
id BIGSERIAL PRIMARY KEY,
-- identita (3 úrovně)
sha256_file CHAR(64) UNIQUE NOT NULL, -- byte-přesná kopie
sha256_pixels CHAR(64), -- stejná fotka po změně metadat
phash BIGINT, -- vizuální podobnost (Hamming)
dhash BIGINT, -- doplňkový perceptuální hash
-- soubor
file_path VARCHAR(2000) NOT NULL,
file_path_relative VARCHAR(2000),
file_name VARCHAR(500) NOT NULL,
file_stem VARCHAR(500),
file_ext VARCHAR(20),
file_size BIGINT, -- bytes
mime_type VARCHAR(50),
format VARCHAR(20), -- JPEG, PNG, HEIC…
mode VARCHAR(20), -- RGB, RGBA…
width INT,
height INT,
megapixels NUMERIC(8,2),
has_transparency BOOLEAN DEFAULT FALSE,
icc_profile BOOLEAN DEFAULT FALSE,
embedded_thumbnail BOOLEAN DEFAULT FALSE,
-- časy
taken_at TIMESTAMPTZ, -- preferovaně z EXIF (s TZ)
taken_at_source VARCHAR(20), -- 'exif' / 'mtime' / 'unknown'
mtime TIMESTAMPTZ, -- filesystem mtime
collected_at TIMESTAMPTZ, -- kdy jsme skenovali
-- technika (z EXIF)
camera_make VARCHAR(100),
camera_model VARCHAR(255),
lens_model VARCHAR(255),
iso INT,
aperture NUMERIC(5,2),
exposure_time VARCHAR(30), -- "1/500"
focal_length_mm NUMERIC(6,2),
-- GPS (NULL pokud chybí)
gps_lat NUMERIC(10,7),
gps_lon NUMERIC(10,7),
gps_altitude NUMERIC(7,2),
-- klasifikace
is_screenshot BOOLEAN DEFAULT FALSE,
face_count INT, -- z XMP / AI (zatím NULL)
-- raw metadata jako JSONB pro dotazy a budoucí rozšíření
exif_raw JSONB,
iptc_raw JSONB,
xmp_raw JSONB,
-- import / zpracování
imported_at TIMESTAMPTZ DEFAULT NOW(),
processing_status VARCHAR(50) DEFAULT 'pending'
);
-- Indexy
CREATE INDEX IF NOT EXISTS idx_photos_sha256_pixels ON photos(sha256_pixels);
CREATE INDEX IF NOT EXISTS idx_photos_phash ON photos(phash);
CREATE INDEX IF NOT EXISTS idx_photos_taken_at ON photos(taken_at);
CREATE INDEX IF NOT EXISTS idx_photos_camera_model ON photos(camera_model);
CREATE INDEX IF NOT EXISTS idx_photos_file_name ON photos(file_name);
CREATE INDEX IF NOT EXISTS idx_photos_file_ext ON photos(file_ext);
CREATE INDEX IF NOT EXISTS idx_photos_exif_gin ON photos USING GIN (exif_raw);
-- Tagy (hierarchické: místo > Praha > Karlův most)
CREATE TABLE IF NOT EXISTS tags (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
parent_tag_id INT REFERENCES tags(id),
UNIQUE(name, parent_tag_id)
);
-- Vazební tabulka foto ↔ tag
CREATE TABLE IF NOT EXISTS photo_tags (
photo_id BIGINT REFERENCES photos(id) ON DELETE CASCADE,
tag_id INT REFERENCES tags(id) ON DELETE CASCADE,
source VARCHAR(20), -- 'manual' / 'iptc' / 'xmp' / 'auto'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (photo_id, tag_id)
);
"""
# ──────────────────────────────────────────────────────────────────────────────
# Pomocné parsovací funkce
# ──────────────────────────────────────────────────────────────────────────────
def hex_to_int64(hex_str: str) -> Optional[int]:
"""
Převede 16-znakový hex hash (pHash/dHash) na signed int64 pro BIGINT.
ExifRead vrací např. "9ab964e46386999b" → potřebujeme signed 64-bit.
"""
if not hex_str:
return None
try:
unsigned = int(str(hex_str).strip(), 16) & 0xFFFFFFFFFFFFFFFF
return struct.unpack("q", struct.pack("Q", unsigned))[0]
except Exception:
return None
def parse_fraction(s) -> Optional[float]:
"""
Parsuje zlomky které ExifRead vrací jako stringy:
"3/4" → 0.75
"1.75" → 1.75
"28/10"→ 2.8
"""
if s is None:
return None
try:
s = str(s).strip()
if "/" in s:
num, den = s.split("/", 1)
d = float(den)
return float(num) / d if d != 0 else None
return float(s)
except Exception:
return None
def parse_exif_datetime(dt_str, offset_str=None) -> Optional[datetime]:
"""
Parsuje EXIF datum "2026:05:18 13:54:47" + volitelný offset "+02:00".
Vrátí timezone-aware datetime.
"""
if not dt_str:
return None
try:
s = str(dt_str).strip()
# ExifRead formát: "YYYY:MM:DD HH:MM:SS" — první dva oddělovače jsou ':'
date_part = s[:10].replace(":", "-")
time_part = s[11:19] if len(s) >= 19 else "00:00:00"
dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
if offset_str:
m = re.match(r"([+-])(\d{2}):(\d{2})", str(offset_str).strip())
if m:
sign = 1 if m.group(1) == "+" else -1
tz = timezone(timedelta(hours=sign * int(m.group(2)),
minutes=sign * int(m.group(3))))
return dt.replace(tzinfo=tz)
# Bez offsetu ukládáme jako UTC (lepší než naive datetime)
return dt.replace(tzinfo=timezone.utc)
except Exception:
return None
def parse_gps_coord(coord_str, ref: str = None) -> Optional[float]:
"""
Parsuje GPS souřadnici z ExifRead.
Formáty: "[46, 5, 2762/100]", "46.083333", "46/1, 5/1, 276/100"
"""
if not coord_str:
return None
try:
s = str(coord_str).strip().strip("[]")
parts = [p.strip() for p in s.split(",")]
degrees = parse_fraction(parts[0])
minutes = parse_fraction(parts[1]) if len(parts) > 1 else 0.0
seconds = parse_fraction(parts[2]) if len(parts) > 2 else 0.0
if degrees is None:
return None
val = degrees + (minutes or 0.0) / 60.0 + (seconds or 0.0) / 3600.0
if ref and str(ref).upper() in ("S", "W"):
val = -val
return round(val, 7)
except Exception:
return None
def parse_iso(raw) -> Optional[int]:
"""ISO může být '800', '[800]', '[800, 0]' apod."""
if raw is None:
return None
try:
s = str(raw).strip().strip("[]").split(",")[0].strip()
return int(float(s))
except Exception:
return None
def clean_nullbytes(obj):
"""
Rekurzivně odstraní null byte \\x00 ze všech stringů.
PostgreSQL odmítá \\u0000 v text / JSONB polích.
"""
if isinstance(obj, str):
return obj.replace("\x00", "")
if isinstance(obj, dict):
return {k: clean_nullbytes(v) for k, v in obj.items()}
if isinstance(obj, list):
return [clean_nullbytes(v) for v in obj]
return obj
MIME_MAP = {
"JPEG": "image/jpeg", "JPG": "image/jpeg",
"PNG": "image/png",
"GIF": "image/gif",
"WEBP": "image/webp",
"HEIF": "image/heif", "HEIC": "image/heif",
"TIFF": "image/tiff", "TIF": "image/tiff",
"BMP": "image/bmp",
}
# ──────────────────────────────────────────────────────────────────────────────
# Extrakce polí z jednoho JSONL záznamu
# ──────────────────────────────────────────────────────────────────────────────
COLUMNS = [
"sha256_file", "sha256_pixels", "phash", "dhash",
"file_path", "file_path_relative", "file_name", "file_stem", "file_ext",
"file_size", "mime_type", "format", "mode", "width", "height", "megapixels",
"has_transparency", "icc_profile", "embedded_thumbnail",
"taken_at", "taken_at_source", "mtime", "collected_at",
"camera_make", "camera_model", "lens_model",
"iso", "aperture", "exposure_time", "focal_length_mm",
"gps_lat", "gps_lon", "gps_altitude",
"is_screenshot", "face_count",
"exif_raw", "iptc_raw", "xmp_raw",
]
def extract_fields(rec: dict) -> tuple:
exif = clean_nullbytes(rec.get("exif") or {})
iptc = clean_nullbytes(rec.get("iptc") or {})
xmp = clean_nullbytes(rec.get("xmp") or {})
# ---- pHash / dHash -------------------------------------------------------
phash = hex_to_int64(rec.get("phash"))
dhash = hex_to_int64(rec.get("dhash"))
# ---- taken_at ------------------------------------------------------------
dt_orig = exif.get("EXIF DateTimeOriginal") or exif.get("Image DateTime")
dt_offset = exif.get("EXIF OffsetTimeOriginal") or exif.get("EXIF OffsetTime")
taken_at = parse_exif_datetime(dt_orig, dt_offset)
taken_at_source = "exif" if taken_at else None
mtime = None
if rec.get("mtime"):
try:
mtime = datetime.fromisoformat(rec["mtime"])
except Exception:
pass
if not taken_at and mtime:
taken_at = mtime
taken_at_source = "mtime"
# ---- collected_at --------------------------------------------------------
collected_at = None
if rec.get("collected_at"):
try:
collected_at = datetime.fromisoformat(rec["collected_at"])
except Exception:
pass
# ---- kamera / optika -----------------------------------------------------
camera_make = (str(exif.get("Image Make", "") or "").strip()) or None
camera_model = (str(exif.get("Image Model", "") or "").strip()) or None
lens_model = (str(exif.get("EXIF LensModel", "") or "").strip()) or None
iso = parse_iso(exif.get("EXIF ISOSpeedRatings"))
_ap = parse_fraction(exif.get("EXIF FNumber"))
aperture = round(_ap, 2) if _ap is not None else None
exposure_raw = exif.get("EXIF ExposureTime")
exposure_time = str(exposure_raw).strip() if exposure_raw else None
_fl_raw = exif.get("EXIF FocalLength")
_fl = parse_fraction(str(_fl_raw).split()[0]) if _fl_raw else None
focal_length_mm = round(_fl, 2) if _fl is not None else None
# ---- GPS -----------------------------------------------------------------
gps_lat = parse_gps_coord(
exif.get("GPS GPSLatitude"),
exif.get("GPS GPSLatitudeRef")
)
gps_lon = parse_gps_coord(
exif.get("GPS GPSLongitude"),
exif.get("GPS GPSLongitudeRef")
)
_alt = parse_fraction(exif.get("GPS GPSAltitude"))
if _alt is not None and str(exif.get("GPS GPSAltitudeRef", "0")) == "1":
_alt = -_alt
gps_altitude = round(_alt, 2) if _alt is not None else None
# ---- klasifikace ---------------------------------------------------------
xmp_desc = str(
xmp.get("description") or xmp.get("dc:description") or ""
).lower()
is_screenshot = "screenshot" in xmp_desc
face_count = None
if "face_regions_count" in xmp:
try:
face_count = int(xmp["face_regions_count"])
except Exception:
pass
# ---- soubor info ---------------------------------------------------------
fmt = (rec.get("format") or "").strip()
mime_type = MIME_MAP.get(fmt.upper(), f"image/{fmt.lower()}" if fmt else None)
fields = {
"sha256_file": rec.get("sha256_file"),
"sha256_pixels": rec.get("sha256_pixels"),
"phash": phash,
"dhash": dhash,
"file_path": rec.get("file_path", ""),
"file_path_relative": rec.get("file_path_relative"),
"file_name": rec.get("file_name", ""),
"file_stem": rec.get("file_stem"),
"file_ext": (rec.get("file_ext") or "").lower().strip() or None,
"file_size": int(rec["file_size"]) if rec.get("file_size") else None,
"mime_type": mime_type,
"format": fmt or None,
"mode": rec.get("mode"),
"width": int(rec["width"]) if rec.get("width") else None,
"height": int(rec["height"]) if rec.get("height") else None,
"megapixels": rec.get("megapixels"),
"has_transparency": bool(rec.get("has_transparency")),
"icc_profile": bool(rec.get("icc_profile")),
"embedded_thumbnail": bool(rec.get("embedded_thumbnail")),
"taken_at": taken_at,
"taken_at_source": taken_at_source,
"mtime": mtime,
"collected_at": collected_at,
"camera_make": camera_make,
"camera_model": camera_model,
"lens_model": lens_model,
"iso": iso,
"aperture": aperture,
"exposure_time": exposure_time,
"focal_length_mm": focal_length_mm,
"gps_lat": gps_lat,
"gps_lon": gps_lon,
"gps_altitude": gps_altitude,
"is_screenshot": is_screenshot,
"face_count": face_count,
"exif_raw": json.dumps(exif, ensure_ascii=False) if exif else None,
"iptc_raw": json.dumps(iptc, ensure_ascii=False) if iptc else None,
"xmp_raw": json.dumps(xmp, ensure_ascii=False) if xmp else None,
}
return tuple(fields[c] for c in COLUMNS)
# ──────────────────────────────────────────────────────────────────────────────
# DB pomocníci
# ──────────────────────────────────────────────────────────────────────────────
INSERT_SQL = f"""
INSERT INTO photos ({", ".join(COLUMNS)})
VALUES %s
ON CONFLICT (sha256_file) DO NOTHING
RETURNING id
"""
def ensure_database():
"""Vytvoří cílovou databázi pokud ještě neexistuje."""
target_db = DB_CONFIG["dbname"]
try:
admin_cfg = {**DB_CONFIG, "dbname": "postgres"}
conn = psycopg2.connect(**admin_cfg)
conn.autocommit = True
cur = conn.cursor()
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (target_db,))
if cur.fetchone():
print(f" Databáze '{target_db}' existuje.")
else:
cur.execute(f'CREATE DATABASE "{target_db}" ENCODING = \'UTF8\'')
print(f" Databáze '{target_db}' vytvořena.")
cur.close()
conn.close()
except Exception as e:
print(f" [WARN] Nelze ověřit/vytvořit databázi: {e}")
print(f" Ujistěte se, že databáze '{target_db}' existuje ručně.")
def create_schema(conn):
with conn.cursor() as cur:
cur.execute(SCHEMA_SQL)
conn.commit()
print(" Schéma OK (tabulky a indexy vytvořeny / již existují).")
# ──────────────────────────────────────────────────────────────────────────────
# Hlavní import
# ──────────────────────────────────────────────────────────────────────────────
def import_jsonl(jsonl_path: Path):
print(f"\n{'='*60}")
print(f" FotkyBuzalkovi — import do PostgreSQL")
print(f" Soubor : {jsonl_path}")
print(f" DB : {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}")
print(f"{'='*60}\n")
print("1) Kontrola databáze...")
ensure_database()
print("2) Připojení a vytvoření schématu...")
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
create_schema(conn)
print("3) Import záznamů...")
total = inserted = errors = 0
batch: list = []
with open(jsonl_path, encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
batch.append(extract_fields(rec))
except Exception as e:
errors += 1
if errors <= 10:
print(f"\n [CHYBA] řádek {lineno}: {e}")
continue
if len(batch) >= BATCH_SIZE:
inserted += _flush(conn, batch)
total += len(batch)
batch = []
_progress(total, inserted, errors)
# zbytek
if batch:
inserted += _flush(conn, batch)
total += len(batch)
conn.commit()
conn.close()
print(f"\n\n{'='*60}")
print(f" Hotovo!")
print(f" Zpracováno : {total:>8}")
print(f" Vloženo : {inserted:>8}")
print(f" Duplicity : {total - inserted:>8} (přeskočeno)")
print(f" Chyby : {errors:>8}")
print(f"{'='*60}\n")
def _flush(conn, batch: list) -> int:
"""Vrátí počet skutečně vložených řádků (duplicity jsou přeskočeny)."""
with conn.cursor() as cur:
rows = execute_values(cur, INSERT_SQL, batch, fetch=True)
return len(rows)
def _progress(total: int, inserted: int, errors: int):
print(f" {total:>8} řádků | {inserted:>8} vloženo | {errors} chyb", end="\r")
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
path = Path(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_JSONL
if not path.exists():
print(f"[ERROR] Soubor nenalezen: {path}")
sys.exit(1)
import_jsonl(path)