#!/usr/bin/env python3 """ Import JSONL metadat do PostgreSQL (fotky_buzalkovi). Použití: python import_to_db.py # output/10_metadata.jsonl python import_to_db.py output/jiny_soubor.jsonl Co dělá: 1. Vytvoří databázi 'fotky_buzalkovi' pokud neexistuje 2. Vytvoří tabulky photos / tags / photo_tags (IF NOT EXISTS) 3. Importuje záznamy po dávkách (ON CONFLICT DO NOTHING → opakované spuštění je bezpečné) """ import json import os import re import struct import sys from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional import psycopg2 import psycopg2.extras from psycopg2.extras import execute_values # ────────────────────────────────────────────────────────────────────────────── # Konfigurace z .env (pokud je python-dotenv nainstalován) nebo z prostředí # ────────────────────────────────────────────────────────────────────────────── try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # bez dotenv čteme z os.environ nebo defaults DB_CONFIG = { "host": os.getenv("DB_HOST", "192.168.1.76"), "port": int(os.getenv("DB_PORT", "5432")), "user": os.getenv("DB_USER", "vladimir.buzalka"), "password": os.getenv("DB_PASSWORD", ""), "dbname": os.getenv("DB_NAME", "fotky_buzalkovi"), } DEFAULT_JSONL = Path(__file__).parent / "output" / "10_metadata.jsonl" BATCH_SIZE = 500 # ────────────────────────────────────────────────────────────────────────────── # Schema # ────────────────────────────────────────────────────────────────────────────── SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS photos ( id BIGSERIAL PRIMARY KEY, -- identita (3 úrovně) sha256_file CHAR(64) UNIQUE NOT NULL, -- byte-přesná kopie sha256_pixels CHAR(64), -- stejná fotka po změně metadat phash BIGINT, -- vizuální podobnost (Hamming) dhash BIGINT, -- doplňkový perceptuální hash -- soubor file_path VARCHAR(2000) NOT NULL, file_path_relative VARCHAR(2000), file_name VARCHAR(500) NOT NULL, file_stem VARCHAR(500), file_ext VARCHAR(20), file_size BIGINT, -- bytes mime_type VARCHAR(50), format VARCHAR(20), -- JPEG, PNG, HEIC… mode VARCHAR(20), -- RGB, RGBA… width INT, height INT, megapixels NUMERIC(8,2), has_transparency BOOLEAN DEFAULT FALSE, icc_profile BOOLEAN DEFAULT FALSE, embedded_thumbnail BOOLEAN DEFAULT FALSE, -- časy taken_at TIMESTAMPTZ, -- preferovaně z EXIF (s TZ) taken_at_source VARCHAR(20), -- 'exif' / 'mtime' / 'unknown' mtime TIMESTAMPTZ, -- filesystem mtime collected_at TIMESTAMPTZ, -- kdy jsme skenovali -- technika (z EXIF) camera_make VARCHAR(100), camera_model VARCHAR(255), lens_model VARCHAR(255), iso INT, aperture NUMERIC(5,2), exposure_time VARCHAR(30), -- "1/500" focal_length_mm NUMERIC(6,2), -- GPS (NULL pokud chybí) gps_lat NUMERIC(10,7), gps_lon NUMERIC(10,7), gps_altitude NUMERIC(7,2), -- klasifikace is_screenshot BOOLEAN DEFAULT FALSE, face_count INT, -- z XMP / AI (zatím NULL) -- raw metadata jako JSONB pro dotazy a budoucí rozšíření exif_raw JSONB, iptc_raw JSONB, xmp_raw JSONB, -- import / zpracování imported_at TIMESTAMPTZ DEFAULT NOW(), processing_status VARCHAR(50) DEFAULT 'pending' ); -- Indexy CREATE INDEX IF NOT EXISTS idx_photos_sha256_pixels ON photos(sha256_pixels); CREATE INDEX IF NOT EXISTS idx_photos_phash ON photos(phash); CREATE INDEX IF NOT EXISTS idx_photos_taken_at ON photos(taken_at); CREATE INDEX IF NOT EXISTS idx_photos_camera_model ON photos(camera_model); CREATE INDEX IF NOT EXISTS idx_photos_file_name ON photos(file_name); CREATE INDEX IF NOT EXISTS idx_photos_file_ext ON photos(file_ext); CREATE INDEX IF NOT EXISTS idx_photos_exif_gin ON photos USING GIN (exif_raw); -- Tagy (hierarchické: místo > Praha > Karlův most) CREATE TABLE IF NOT EXISTS tags ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, parent_tag_id INT REFERENCES tags(id), UNIQUE(name, parent_tag_id) ); -- Vazební tabulka foto ↔ tag CREATE TABLE IF NOT EXISTS photo_tags ( photo_id BIGINT REFERENCES photos(id) ON DELETE CASCADE, tag_id INT REFERENCES tags(id) ON DELETE CASCADE, source VARCHAR(20), -- 'manual' / 'iptc' / 'xmp' / 'auto' created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (photo_id, tag_id) ); """ # ────────────────────────────────────────────────────────────────────────────── # Pomocné parsovací funkce # ────────────────────────────────────────────────────────────────────────────── def hex_to_int64(hex_str: str) -> Optional[int]: """ Převede 16-znakový hex hash (pHash/dHash) na signed int64 pro BIGINT. ExifRead vrací např. "9ab964e46386999b" → potřebujeme signed 64-bit. """ if not hex_str: return None try: unsigned = int(str(hex_str).strip(), 16) & 0xFFFFFFFFFFFFFFFF return struct.unpack("q", struct.pack("Q", unsigned))[0] except Exception: return None def parse_fraction(s) -> Optional[float]: """ Parsuje zlomky které ExifRead vrací jako stringy: "3/4" → 0.75 "1.75" → 1.75 "28/10"→ 2.8 """ if s is None: return None try: s = str(s).strip() if "/" in s: num, den = s.split("/", 1) d = float(den) return float(num) / d if d != 0 else None return float(s) except Exception: return None def parse_exif_datetime(dt_str, offset_str=None) -> Optional[datetime]: """ Parsuje EXIF datum "2026:05:18 13:54:47" + volitelný offset "+02:00". Vrátí timezone-aware datetime. """ if not dt_str: return None try: s = str(dt_str).strip() # ExifRead formát: "YYYY:MM:DD HH:MM:SS" — první dva oddělovače jsou ':' date_part = s[:10].replace(":", "-") time_part = s[11:19] if len(s) >= 19 else "00:00:00" dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S") if offset_str: m = re.match(r"([+-])(\d{2}):(\d{2})", str(offset_str).strip()) if m: sign = 1 if m.group(1) == "+" else -1 tz = timezone(timedelta(hours=sign * int(m.group(2)), minutes=sign * int(m.group(3)))) return dt.replace(tzinfo=tz) # Bez offsetu ukládáme jako UTC (lepší než naive datetime) return dt.replace(tzinfo=timezone.utc) except Exception: return None def parse_gps_coord(coord_str, ref: str = None) -> Optional[float]: """ Parsuje GPS souřadnici z ExifRead. Formáty: "[46, 5, 2762/100]", "46.083333", "46/1, 5/1, 276/100" """ if not coord_str: return None try: s = str(coord_str).strip().strip("[]") parts = [p.strip() for p in s.split(",")] degrees = parse_fraction(parts[0]) minutes = parse_fraction(parts[1]) if len(parts) > 1 else 0.0 seconds = parse_fraction(parts[2]) if len(parts) > 2 else 0.0 if degrees is None: return None val = degrees + (minutes or 0.0) / 60.0 + (seconds or 0.0) / 3600.0 if ref and str(ref).upper() in ("S", "W"): val = -val return round(val, 7) except Exception: return None def parse_iso(raw) -> Optional[int]: """ISO může být '800', '[800]', '[800, 0]' apod.""" if raw is None: return None try: s = str(raw).strip().strip("[]").split(",")[0].strip() return int(float(s)) except Exception: return None def clean_nullbytes(obj): """ Rekurzivně odstraní null byte \\x00 ze všech stringů. PostgreSQL odmítá \\u0000 v text / JSONB polích. """ if isinstance(obj, str): return obj.replace("\x00", "") if isinstance(obj, dict): return {k: clean_nullbytes(v) for k, v in obj.items()} if isinstance(obj, list): return [clean_nullbytes(v) for v in obj] return obj MIME_MAP = { "JPEG": "image/jpeg", "JPG": "image/jpeg", "PNG": "image/png", "GIF": "image/gif", "WEBP": "image/webp", "HEIF": "image/heif", "HEIC": "image/heif", "TIFF": "image/tiff", "TIF": "image/tiff", "BMP": "image/bmp", } # ────────────────────────────────────────────────────────────────────────────── # Extrakce polí z jednoho JSONL záznamu # ────────────────────────────────────────────────────────────────────────────── COLUMNS = [ "sha256_file", "sha256_pixels", "phash", "dhash", "file_path", "file_path_relative", "file_name", "file_stem", "file_ext", "file_size", "mime_type", "format", "mode", "width", "height", "megapixels", "has_transparency", "icc_profile", "embedded_thumbnail", "taken_at", "taken_at_source", "mtime", "collected_at", "camera_make", "camera_model", "lens_model", "iso", "aperture", "exposure_time", "focal_length_mm", "gps_lat", "gps_lon", "gps_altitude", "is_screenshot", "face_count", "exif_raw", "iptc_raw", "xmp_raw", ] def extract_fields(rec: dict) -> tuple: exif = clean_nullbytes(rec.get("exif") or {}) iptc = clean_nullbytes(rec.get("iptc") or {}) xmp = clean_nullbytes(rec.get("xmp") or {}) # ---- pHash / dHash ------------------------------------------------------- phash = hex_to_int64(rec.get("phash")) dhash = hex_to_int64(rec.get("dhash")) # ---- taken_at ------------------------------------------------------------ dt_orig = exif.get("EXIF DateTimeOriginal") or exif.get("Image DateTime") dt_offset = exif.get("EXIF OffsetTimeOriginal") or exif.get("EXIF OffsetTime") taken_at = parse_exif_datetime(dt_orig, dt_offset) taken_at_source = "exif" if taken_at else None mtime = None if rec.get("mtime"): try: mtime = datetime.fromisoformat(rec["mtime"]) except Exception: pass if not taken_at and mtime: taken_at = mtime taken_at_source = "mtime" # ---- collected_at -------------------------------------------------------- collected_at = None if rec.get("collected_at"): try: collected_at = datetime.fromisoformat(rec["collected_at"]) except Exception: pass # ---- kamera / optika ----------------------------------------------------- camera_make = (str(exif.get("Image Make", "") or "").strip()) or None camera_model = (str(exif.get("Image Model", "") or "").strip()) or None lens_model = (str(exif.get("EXIF LensModel", "") or "").strip()) or None iso = parse_iso(exif.get("EXIF ISOSpeedRatings")) _ap = parse_fraction(exif.get("EXIF FNumber")) aperture = round(_ap, 2) if _ap is not None else None exposure_raw = exif.get("EXIF ExposureTime") exposure_time = str(exposure_raw).strip() if exposure_raw else None _fl_raw = exif.get("EXIF FocalLength") _fl = parse_fraction(str(_fl_raw).split()[0]) if _fl_raw else None focal_length_mm = round(_fl, 2) if _fl is not None else None # ---- GPS ----------------------------------------------------------------- gps_lat = parse_gps_coord( exif.get("GPS GPSLatitude"), exif.get("GPS GPSLatitudeRef") ) gps_lon = parse_gps_coord( exif.get("GPS GPSLongitude"), exif.get("GPS GPSLongitudeRef") ) _alt = parse_fraction(exif.get("GPS GPSAltitude")) if _alt is not None and str(exif.get("GPS GPSAltitudeRef", "0")) == "1": _alt = -_alt gps_altitude = round(_alt, 2) if _alt is not None else None # ---- klasifikace --------------------------------------------------------- xmp_desc = str( xmp.get("description") or xmp.get("dc:description") or "" ).lower() is_screenshot = "screenshot" in xmp_desc face_count = None if "face_regions_count" in xmp: try: face_count = int(xmp["face_regions_count"]) except Exception: pass # ---- soubor info --------------------------------------------------------- fmt = (rec.get("format") or "").strip() mime_type = MIME_MAP.get(fmt.upper(), f"image/{fmt.lower()}" if fmt else None) fields = { "sha256_file": rec.get("sha256_file"), "sha256_pixels": rec.get("sha256_pixels"), "phash": phash, "dhash": dhash, "file_path": rec.get("file_path", ""), "file_path_relative": rec.get("file_path_relative"), "file_name": rec.get("file_name", ""), "file_stem": rec.get("file_stem"), "file_ext": (rec.get("file_ext") or "").lower().strip() or None, "file_size": int(rec["file_size"]) if rec.get("file_size") else None, "mime_type": mime_type, "format": fmt or None, "mode": rec.get("mode"), "width": int(rec["width"]) if rec.get("width") else None, "height": int(rec["height"]) if rec.get("height") else None, "megapixels": rec.get("megapixels"), "has_transparency": bool(rec.get("has_transparency")), "icc_profile": bool(rec.get("icc_profile")), "embedded_thumbnail": bool(rec.get("embedded_thumbnail")), "taken_at": taken_at, "taken_at_source": taken_at_source, "mtime": mtime, "collected_at": collected_at, "camera_make": camera_make, "camera_model": camera_model, "lens_model": lens_model, "iso": iso, "aperture": aperture, "exposure_time": exposure_time, "focal_length_mm": focal_length_mm, "gps_lat": gps_lat, "gps_lon": gps_lon, "gps_altitude": gps_altitude, "is_screenshot": is_screenshot, "face_count": face_count, "exif_raw": json.dumps(exif, ensure_ascii=False) if exif else None, "iptc_raw": json.dumps(iptc, ensure_ascii=False) if iptc else None, "xmp_raw": json.dumps(xmp, ensure_ascii=False) if xmp else None, } return tuple(fields[c] for c in COLUMNS) # ────────────────────────────────────────────────────────────────────────────── # DB pomocníci # ────────────────────────────────────────────────────────────────────────────── INSERT_SQL = f""" INSERT INTO photos ({", ".join(COLUMNS)}) VALUES %s ON CONFLICT (sha256_file) DO NOTHING RETURNING id """ def ensure_database(): """Vytvoří cílovou databázi pokud ještě neexistuje.""" target_db = DB_CONFIG["dbname"] try: admin_cfg = {**DB_CONFIG, "dbname": "postgres"} conn = psycopg2.connect(**admin_cfg) conn.autocommit = True cur = conn.cursor() cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (target_db,)) if cur.fetchone(): print(f" Databáze '{target_db}' existuje.") else: cur.execute(f'CREATE DATABASE "{target_db}" ENCODING = \'UTF8\'') print(f" Databáze '{target_db}' vytvořena.") cur.close() conn.close() except Exception as e: print(f" [WARN] Nelze ověřit/vytvořit databázi: {e}") print(f" Ujistěte se, že databáze '{target_db}' existuje ručně.") def create_schema(conn): with conn.cursor() as cur: cur.execute(SCHEMA_SQL) conn.commit() print(" Schéma OK (tabulky a indexy vytvořeny / již existují).") # ────────────────────────────────────────────────────────────────────────────── # Hlavní import # ────────────────────────────────────────────────────────────────────────────── def import_jsonl(jsonl_path: Path): print(f"\n{'='*60}") print(f" FotkyBuzalkovi — import do PostgreSQL") print(f" Soubor : {jsonl_path}") print(f" DB : {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}") print(f"{'='*60}\n") print("1) Kontrola databáze...") ensure_database() print("2) Připojení a vytvoření schématu...") conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = False create_schema(conn) print("3) Import záznamů...") total = inserted = errors = 0 batch: list = [] with open(jsonl_path, encoding="utf-8") as f: for lineno, line in enumerate(f, 1): line = line.strip() if not line: continue try: rec = json.loads(line) batch.append(extract_fields(rec)) except Exception as e: errors += 1 if errors <= 10: print(f"\n [CHYBA] řádek {lineno}: {e}") continue if len(batch) >= BATCH_SIZE: inserted += _flush(conn, batch) total += len(batch) batch = [] _progress(total, inserted, errors) # zbytek if batch: inserted += _flush(conn, batch) total += len(batch) conn.commit() conn.close() print(f"\n\n{'='*60}") print(f" Hotovo!") print(f" Zpracováno : {total:>8}") print(f" Vloženo : {inserted:>8}") print(f" Duplicity : {total - inserted:>8} (přeskočeno)") print(f" Chyby : {errors:>8}") print(f"{'='*60}\n") def _flush(conn, batch: list) -> int: """Vrátí počet skutečně vložených řádků (duplicity jsou přeskočeny).""" with conn.cursor() as cur: rows = execute_values(cur, INSERT_SQL, batch, fetch=True) return len(rows) def _progress(total: int, inserted: int, errors: int): print(f" {total:>8} řádků | {inserted:>8} vloženo | {errors} chyb", end="\r") # ────────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": path = Path(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_JSONL if not path.exists(): print(f"[ERROR] Soubor nenalezen: {path}") sys.exit(1) import_jsonl(path)