notebookVb

This commit is contained in:
administrator
2026-05-22 20:05:14 +02:00
parent 16b5ca2fa5
commit c0aa4fb684
2 changed files with 531 additions and 0 deletions
+4
View File
@@ -65,3 +65,7 @@ ENV.bak/
*.log
.env
.env.local
# Generovaná data (velké soubory, nepatří do gitu)
output/
photo_exploration.json
+527
View File
@@ -0,0 +1,527 @@
#!/usr/bin/env python3
"""
Import JSONL metadat do PostgreSQL (fotky_buzalkovi).
Použití:
python import_to_db.py # output/10_metadata.jsonl
python import_to_db.py output/jiny_soubor.jsonl
Co dělá:
1. Vytvoří databázi 'fotky_buzalkovi' pokud neexistuje
2. Vytvoří tabulky photos / tags / photo_tags (IF NOT EXISTS)
3. Importuje záznamy po dávkách (ON CONFLICT DO NOTHING → opakované spuštění je bezpečné)
"""
import json
import os
import re
import struct
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
import psycopg2
import psycopg2.extras
from psycopg2.extras import execute_values
# ──────────────────────────────────────────────────────────────────────────────
# Konfigurace z .env (pokud je python-dotenv nainstalován) nebo z prostředí
# ──────────────────────────────────────────────────────────────────────────────
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # bez dotenv čteme z os.environ nebo defaults
DB_CONFIG = {
"host": os.getenv("DB_HOST", "192.168.1.76"),
"port": int(os.getenv("DB_PORT", "5432")),
"user": os.getenv("DB_USER", "vladimir.buzalka"),
"password": os.getenv("DB_PASSWORD", ""),
"dbname": os.getenv("DB_NAME", "fotky_buzalkovi"),
}
DEFAULT_JSONL = Path(__file__).parent / "output" / "10_metadata.jsonl"
BATCH_SIZE = 500
# ──────────────────────────────────────────────────────────────────────────────
# Schema
# ──────────────────────────────────────────────────────────────────────────────
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS photos (
id BIGSERIAL PRIMARY KEY,
-- identita (3 úrovně)
sha256_file CHAR(64) UNIQUE NOT NULL, -- byte-přesná kopie
sha256_pixels CHAR(64), -- stejná fotka po změně metadat
phash BIGINT, -- vizuální podobnost (Hamming)
dhash BIGINT, -- doplňkový perceptuální hash
-- soubor
file_path VARCHAR(2000) NOT NULL,
file_path_relative VARCHAR(2000),
file_name VARCHAR(500) NOT NULL,
file_stem VARCHAR(500),
file_ext VARCHAR(20),
file_size BIGINT, -- bytes
mime_type VARCHAR(50),
format VARCHAR(20), -- JPEG, PNG, HEIC…
mode VARCHAR(20), -- RGB, RGBA…
width INT,
height INT,
megapixels NUMERIC(8,2),
has_transparency BOOLEAN DEFAULT FALSE,
icc_profile BOOLEAN DEFAULT FALSE,
embedded_thumbnail BOOLEAN DEFAULT FALSE,
-- časy
taken_at TIMESTAMPTZ, -- preferovaně z EXIF (s TZ)
taken_at_source VARCHAR(20), -- 'exif' / 'mtime' / 'unknown'
mtime TIMESTAMPTZ, -- filesystem mtime
collected_at TIMESTAMPTZ, -- kdy jsme skenovali
-- technika (z EXIF)
camera_make VARCHAR(100),
camera_model VARCHAR(255),
lens_model VARCHAR(255),
iso INT,
aperture NUMERIC(5,2),
exposure_time VARCHAR(30), -- "1/500"
focal_length_mm NUMERIC(6,2),
-- GPS (NULL pokud chybí)
gps_lat NUMERIC(10,7),
gps_lon NUMERIC(10,7),
gps_altitude NUMERIC(7,2),
-- klasifikace
is_screenshot BOOLEAN DEFAULT FALSE,
face_count INT, -- z XMP / AI (zatím NULL)
-- raw metadata jako JSONB pro dotazy a budoucí rozšíření
exif_raw JSONB,
iptc_raw JSONB,
xmp_raw JSONB,
-- import / zpracování
imported_at TIMESTAMPTZ DEFAULT NOW(),
processing_status VARCHAR(50) DEFAULT 'pending'
);
-- Indexy
CREATE INDEX IF NOT EXISTS idx_photos_sha256_pixels ON photos(sha256_pixels);
CREATE INDEX IF NOT EXISTS idx_photos_phash ON photos(phash);
CREATE INDEX IF NOT EXISTS idx_photos_taken_at ON photos(taken_at);
CREATE INDEX IF NOT EXISTS idx_photos_camera_model ON photos(camera_model);
CREATE INDEX IF NOT EXISTS idx_photos_file_name ON photos(file_name);
CREATE INDEX IF NOT EXISTS idx_photos_file_ext ON photos(file_ext);
CREATE INDEX IF NOT EXISTS idx_photos_exif_gin ON photos USING GIN (exif_raw);
-- Tagy (hierarchické: místo > Praha > Karlův most)
CREATE TABLE IF NOT EXISTS tags (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
parent_tag_id INT REFERENCES tags(id),
UNIQUE(name, parent_tag_id)
);
-- Vazební tabulka foto ↔ tag
CREATE TABLE IF NOT EXISTS photo_tags (
photo_id BIGINT REFERENCES photos(id) ON DELETE CASCADE,
tag_id INT REFERENCES tags(id) ON DELETE CASCADE,
source VARCHAR(20), -- 'manual' / 'iptc' / 'xmp' / 'auto'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (photo_id, tag_id)
);
"""
# ──────────────────────────────────────────────────────────────────────────────
# Pomocné parsovací funkce
# ──────────────────────────────────────────────────────────────────────────────
def hex_to_int64(hex_str: str) -> Optional[int]:
"""
Převede 16-znakový hex hash (pHash/dHash) na signed int64 pro BIGINT.
ExifRead vrací např. "9ab964e46386999b" → potřebujeme signed 64-bit.
"""
if not hex_str:
return None
try:
unsigned = int(str(hex_str).strip(), 16) & 0xFFFFFFFFFFFFFFFF
return struct.unpack("q", struct.pack("Q", unsigned))[0]
except Exception:
return None
def parse_fraction(s) -> Optional[float]:
"""
Parsuje zlomky které ExifRead vrací jako stringy:
"3/4" → 0.75
"1.75" → 1.75
"28/10"→ 2.8
"""
if s is None:
return None
try:
s = str(s).strip()
if "/" in s:
num, den = s.split("/", 1)
d = float(den)
return float(num) / d if d != 0 else None
return float(s)
except Exception:
return None
def parse_exif_datetime(dt_str, offset_str=None) -> Optional[datetime]:
"""
Parsuje EXIF datum "2026:05:18 13:54:47" + volitelný offset "+02:00".
Vrátí timezone-aware datetime.
"""
if not dt_str:
return None
try:
s = str(dt_str).strip()
# ExifRead formát: "YYYY:MM:DD HH:MM:SS" — první dva oddělovače jsou ':'
date_part = s[:10].replace(":", "-")
time_part = s[11:19] if len(s) >= 19 else "00:00:00"
dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
if offset_str:
m = re.match(r"([+-])(\d{2}):(\d{2})", str(offset_str).strip())
if m:
sign = 1 if m.group(1) == "+" else -1
tz = timezone(timedelta(hours=sign * int(m.group(2)),
minutes=sign * int(m.group(3))))
return dt.replace(tzinfo=tz)
# Bez offsetu ukládáme jako UTC (lepší než naive datetime)
return dt.replace(tzinfo=timezone.utc)
except Exception:
return None
def parse_gps_coord(coord_str, ref: str = None) -> Optional[float]:
"""
Parsuje GPS souřadnici z ExifRead.
Formáty: "[46, 5, 2762/100]", "46.083333", "46/1, 5/1, 276/100"
"""
if not coord_str:
return None
try:
s = str(coord_str).strip().strip("[]")
parts = [p.strip() for p in s.split(",")]
degrees = parse_fraction(parts[0])
minutes = parse_fraction(parts[1]) if len(parts) > 1 else 0.0
seconds = parse_fraction(parts[2]) if len(parts) > 2 else 0.0
if degrees is None:
return None
val = degrees + (minutes or 0.0) / 60.0 + (seconds or 0.0) / 3600.0
if ref and str(ref).upper() in ("S", "W"):
val = -val
return round(val, 7)
except Exception:
return None
def parse_iso(raw) -> Optional[int]:
"""ISO může být '800', '[800]', '[800, 0]' apod."""
if raw is None:
return None
try:
s = str(raw).strip().strip("[]").split(",")[0].strip()
return int(float(s))
except Exception:
return None
def clean_nullbytes(obj):
"""
Rekurzivně odstraní null byte \\x00 ze všech stringů.
PostgreSQL odmítá \\u0000 v text / JSONB polích.
"""
if isinstance(obj, str):
return obj.replace("\x00", "")
if isinstance(obj, dict):
return {k: clean_nullbytes(v) for k, v in obj.items()}
if isinstance(obj, list):
return [clean_nullbytes(v) for v in obj]
return obj
MIME_MAP = {
"JPEG": "image/jpeg", "JPG": "image/jpeg",
"PNG": "image/png",
"GIF": "image/gif",
"WEBP": "image/webp",
"HEIF": "image/heif", "HEIC": "image/heif",
"TIFF": "image/tiff", "TIF": "image/tiff",
"BMP": "image/bmp",
}
# ──────────────────────────────────────────────────────────────────────────────
# Extrakce polí z jednoho JSONL záznamu
# ──────────────────────────────────────────────────────────────────────────────
COLUMNS = [
"sha256_file", "sha256_pixels", "phash", "dhash",
"file_path", "file_path_relative", "file_name", "file_stem", "file_ext",
"file_size", "mime_type", "format", "mode", "width", "height", "megapixels",
"has_transparency", "icc_profile", "embedded_thumbnail",
"taken_at", "taken_at_source", "mtime", "collected_at",
"camera_make", "camera_model", "lens_model",
"iso", "aperture", "exposure_time", "focal_length_mm",
"gps_lat", "gps_lon", "gps_altitude",
"is_screenshot", "face_count",
"exif_raw", "iptc_raw", "xmp_raw",
]
def extract_fields(rec: dict) -> tuple:
exif = clean_nullbytes(rec.get("exif") or {})
iptc = clean_nullbytes(rec.get("iptc") or {})
xmp = clean_nullbytes(rec.get("xmp") or {})
# ---- pHash / dHash -------------------------------------------------------
phash = hex_to_int64(rec.get("phash"))
dhash = hex_to_int64(rec.get("dhash"))
# ---- taken_at ------------------------------------------------------------
dt_orig = exif.get("EXIF DateTimeOriginal") or exif.get("Image DateTime")
dt_offset = exif.get("EXIF OffsetTimeOriginal") or exif.get("EXIF OffsetTime")
taken_at = parse_exif_datetime(dt_orig, dt_offset)
taken_at_source = "exif" if taken_at else None
mtime = None
if rec.get("mtime"):
try:
mtime = datetime.fromisoformat(rec["mtime"])
except Exception:
pass
if not taken_at and mtime:
taken_at = mtime
taken_at_source = "mtime"
# ---- collected_at --------------------------------------------------------
collected_at = None
if rec.get("collected_at"):
try:
collected_at = datetime.fromisoformat(rec["collected_at"])
except Exception:
pass
# ---- kamera / optika -----------------------------------------------------
camera_make = (str(exif.get("Image Make", "") or "").strip()) or None
camera_model = (str(exif.get("Image Model", "") or "").strip()) or None
lens_model = (str(exif.get("EXIF LensModel", "") or "").strip()) or None
iso = parse_iso(exif.get("EXIF ISOSpeedRatings"))
_ap = parse_fraction(exif.get("EXIF FNumber"))
aperture = round(_ap, 2) if _ap is not None else None
exposure_raw = exif.get("EXIF ExposureTime")
exposure_time = str(exposure_raw).strip() if exposure_raw else None
_fl_raw = exif.get("EXIF FocalLength")
_fl = parse_fraction(str(_fl_raw).split()[0]) if _fl_raw else None
focal_length_mm = round(_fl, 2) if _fl is not None else None
# ---- GPS -----------------------------------------------------------------
gps_lat = parse_gps_coord(
exif.get("GPS GPSLatitude"),
exif.get("GPS GPSLatitudeRef")
)
gps_lon = parse_gps_coord(
exif.get("GPS GPSLongitude"),
exif.get("GPS GPSLongitudeRef")
)
_alt = parse_fraction(exif.get("GPS GPSAltitude"))
if _alt is not None and str(exif.get("GPS GPSAltitudeRef", "0")) == "1":
_alt = -_alt
gps_altitude = round(_alt, 2) if _alt is not None else None
# ---- klasifikace ---------------------------------------------------------
xmp_desc = str(
xmp.get("description") or xmp.get("dc:description") or ""
).lower()
is_screenshot = "screenshot" in xmp_desc
face_count = None
if "face_regions_count" in xmp:
try:
face_count = int(xmp["face_regions_count"])
except Exception:
pass
# ---- soubor info ---------------------------------------------------------
fmt = (rec.get("format") or "").strip()
mime_type = MIME_MAP.get(fmt.upper(), f"image/{fmt.lower()}" if fmt else None)
fields = {
"sha256_file": rec.get("sha256_file"),
"sha256_pixels": rec.get("sha256_pixels"),
"phash": phash,
"dhash": dhash,
"file_path": rec.get("file_path", ""),
"file_path_relative": rec.get("file_path_relative"),
"file_name": rec.get("file_name", ""),
"file_stem": rec.get("file_stem"),
"file_ext": (rec.get("file_ext") or "").lower().strip() or None,
"file_size": int(rec["file_size"]) if rec.get("file_size") else None,
"mime_type": mime_type,
"format": fmt or None,
"mode": rec.get("mode"),
"width": int(rec["width"]) if rec.get("width") else None,
"height": int(rec["height"]) if rec.get("height") else None,
"megapixels": rec.get("megapixels"),
"has_transparency": bool(rec.get("has_transparency")),
"icc_profile": bool(rec.get("icc_profile")),
"embedded_thumbnail": bool(rec.get("embedded_thumbnail")),
"taken_at": taken_at,
"taken_at_source": taken_at_source,
"mtime": mtime,
"collected_at": collected_at,
"camera_make": camera_make,
"camera_model": camera_model,
"lens_model": lens_model,
"iso": iso,
"aperture": aperture,
"exposure_time": exposure_time,
"focal_length_mm": focal_length_mm,
"gps_lat": gps_lat,
"gps_lon": gps_lon,
"gps_altitude": gps_altitude,
"is_screenshot": is_screenshot,
"face_count": face_count,
"exif_raw": json.dumps(exif, ensure_ascii=False) if exif else None,
"iptc_raw": json.dumps(iptc, ensure_ascii=False) if iptc else None,
"xmp_raw": json.dumps(xmp, ensure_ascii=False) if xmp else None,
}
return tuple(fields[c] for c in COLUMNS)
# ──────────────────────────────────────────────────────────────────────────────
# DB pomocníci
# ──────────────────────────────────────────────────────────────────────────────
INSERT_SQL = f"""
INSERT INTO photos ({", ".join(COLUMNS)})
VALUES %s
ON CONFLICT (sha256_file) DO NOTHING
RETURNING id
"""
def ensure_database():
"""Vytvoří cílovou databázi pokud ještě neexistuje."""
target_db = DB_CONFIG["dbname"]
try:
admin_cfg = {**DB_CONFIG, "dbname": "postgres"}
conn = psycopg2.connect(**admin_cfg)
conn.autocommit = True
cur = conn.cursor()
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (target_db,))
if cur.fetchone():
print(f" Databáze '{target_db}' existuje.")
else:
cur.execute(f'CREATE DATABASE "{target_db}" ENCODING = \'UTF8\'')
print(f" Databáze '{target_db}' vytvořena.")
cur.close()
conn.close()
except Exception as e:
print(f" [WARN] Nelze ověřit/vytvořit databázi: {e}")
print(f" Ujistěte se, že databáze '{target_db}' existuje ručně.")
def create_schema(conn):
with conn.cursor() as cur:
cur.execute(SCHEMA_SQL)
conn.commit()
print(" Schéma OK (tabulky a indexy vytvořeny / již existují).")
# ──────────────────────────────────────────────────────────────────────────────
# Hlavní import
# ──────────────────────────────────────────────────────────────────────────────
def import_jsonl(jsonl_path: Path):
print(f"\n{'='*60}")
print(f" FotkyBuzalkovi — import do PostgreSQL")
print(f" Soubor : {jsonl_path}")
print(f" DB : {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}")
print(f"{'='*60}\n")
print("1) Kontrola databáze...")
ensure_database()
print("2) Připojení a vytvoření schématu...")
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
create_schema(conn)
print("3) Import záznamů...")
total = inserted = errors = 0
batch: list = []
with open(jsonl_path, encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
batch.append(extract_fields(rec))
except Exception as e:
errors += 1
if errors <= 10:
print(f"\n [CHYBA] řádek {lineno}: {e}")
continue
if len(batch) >= BATCH_SIZE:
inserted += _flush(conn, batch)
total += len(batch)
batch = []
_progress(total, inserted, errors)
# zbytek
if batch:
inserted += _flush(conn, batch)
total += len(batch)
conn.commit()
conn.close()
print(f"\n\n{'='*60}")
print(f" Hotovo!")
print(f" Zpracováno : {total:>8}")
print(f" Vloženo : {inserted:>8}")
print(f" Duplicity : {total - inserted:>8} (přeskočeno)")
print(f" Chyby : {errors:>8}")
print(f"{'='*60}\n")
def _flush(conn, batch: list) -> int:
"""Vrátí počet skutečně vložených řádků (duplicity jsou přeskočeny)."""
with conn.cursor() as cur:
rows = execute_values(cur, INSERT_SQL, batch, fetch=True)
return len(rows)
def _progress(total: int, inserted: int, errors: int):
print(f" {total:>8} řádků | {inserted:>8} vloženo | {errors} chyb", end="\r")
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
path = Path(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_JSONL
if not path.exists():
print(f"[ERROR] Soubor nenalezen: {path}")
sys.exit(1)
import_jsonl(path)