Files
fotkyBuzalkovi/30 SběrDat/collect_and_import.py
T
2026-05-27 12:55:06 +02:00

790 lines
28 KiB
Python

#!/usr/bin/env python3
"""
collect_and_import.py — Sběr metadat ze zálohy a přímý import do PostgreSQL
Čte záznamy z tabulky zaloha_obrazku, pro každý soubor extrahuje metadata
(EXIF, GPS, IPTC, XMP, hashe, rozměry) a ukládá přímo do tabulky photos.
Nevytváří žádný mezilehlý soubor.
Resume: přeskočí záznamy, kde photos.zaloha_id už existuje.
Použití:
python collect_and_import.py
python collect_and_import.py --workers 4
python collect_and_import.py --limit 500 # jen prvních N (pro test)
python collect_and_import.py --dry-run # jen spočítá, nic nezpracuje
python collect_and_import.py --batch-size 200
Předpoklad pro překlad cest:
V DB jsou cesty vždy uloženy jako nativní Tower1 Linux cesta:
/mnt/user/ZalohaVsechObrazku/...
Skript může běžet na dvou platformách:
Windows (libovolný hostname):
Tower1 je dostupný jako \\Tower1\\ZalohaVsechObrazku\\...
→ /mnt/user/ZalohaVsechObrazku → \\Tower1\\ZalohaVsechObrazku, / → \\
Tower1 (Linux, hostname = Tower1):
Cesta z DB je již nativní, žádný překlad není potřeba.
Skript nikdy neběží na tower (prvním Unraid serveru).
"""
import argparse
import hashlib
import json
import logging
import os
import re
import signal
import struct
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
import exifread
logging.getLogger("exifread").setLevel(logging.CRITICAL)
import imagehash
import psycopg2
import psycopg2.extras
from psycopg2.extras import execute_values
from PIL import Image, ImageOps, IptcImagePlugin
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# ---------------------------------------------------------------------------
# Konfigurace
# ---------------------------------------------------------------------------
DB_CONFIG = {
"host": os.getenv("DB_HOST", "192.168.1.76"),
"port": int(os.getenv("DB_PORT", "5432")),
"user": os.getenv("DB_USER", "vladimir.buzalka"),
"password": os.getenv("DB_PASSWORD", ""),
"dbname": os.getenv("DB_NAME", "fotky_buzalkovi"),
}
# Kanonický prefix cest uložených v DB (vždy nativní Tower1 Linux cesta)
DB_PATH_PREFIX = "/mnt/user/ZalohaVsechObrazku"
# Odpovídající Windows UNC přístup k Tower1 share
WINDOWS_UNC_PREFIX = r"\\Tower1\ZalohaVsechObrazku"
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".tiff", ".tif", ".webp", ".bmp"}
DEFAULT_WORKERS = 2
DEFAULT_BATCH_SIZE = 100
IPTC_TAG_NAMES = {
(2, 5): "ObjectName", (2, 25): "Keywords",
(2, 55): "DateCreated", (2, 60): "TimeCreated",
(2, 80): "Byline", (2, 90): "City",
(2, 95): "ProvinceState", (2, 101): "CountryName",
(2, 105): "Headline", (2, 110): "Credit",
(2, 116): "Copyright", (2, 120): "Caption",
}
MIME_MAP = {
"JPEG": "image/jpeg", "JPG": "image/jpeg",
"PNG": "image/png", "WEBP": "image/webp",
"HEIF": "image/heif", "HEIC": "image/heif",
"TIFF": "image/tiff", "TIF": "image/tiff",
"BMP": "image/bmp", "GIF": "image/gif",
}
# ---------------------------------------------------------------------------
# Překlad cesty: DB (nativní Tower1 Linux) → lokální přístupová cesta
# ---------------------------------------------------------------------------
def db_path_to_local(db_path: str) -> Path:
"""
Převede cestu z DB na lokálně přístupnou cestu podle platformy.
Windows: /mnt/user/ZalohaVsechObrazku/... → \\Tower1\\ZalohaVsechObrazku\\...
Tower1: cesta je již nativní, vrátí beze změny.
"""
if os.name == "nt":
# Windows — přeložíme na UNC a vyměníme lomítka
if db_path.startswith(DB_PATH_PREFIX):
rel = db_path[len(DB_PATH_PREFIX):] # začíná /
return Path(WINDOWS_UNC_PREFIX + rel.replace("/", "\\"))
# Neznámý prefix — vrátíme jak je a necháme selhat při otevření
return Path(db_path)
else:
# Tower1 (Linux) — cesta v DB je již nativní
return Path(db_path)
# ---------------------------------------------------------------------------
# GPS
# ---------------------------------------------------------------------------
def _rational_to_float(r) -> float:
if hasattr(r, "numerator") and hasattr(r, "denominator"):
return r.numerator / r.denominator if r.denominator != 0 else 0.0
return float(r)
def _dms_to_decimal(vals) -> float:
d = _rational_to_float(vals[0])
m = _rational_to_float(vals[1])
s = _rational_to_float(vals[2])
return d + m / 60.0 + s / 3600.0
def extract_gps(raw_tags: dict) -> dict:
result = {}
try:
lat_tag = raw_tags.get("GPS GPSLatitude")
lat_ref = raw_tags.get("GPS GPSLatitudeRef")
lon_tag = raw_tags.get("GPS GPSLongitude")
lon_ref = raw_tags.get("GPS GPSLongitudeRef")
if lat_tag and lon_tag:
lat = _dms_to_decimal(lat_tag.values)
lon = _dms_to_decimal(lon_tag.values)
if lat_ref and str(lat_ref).strip().upper().startswith("S"):
lat = -lat
if lon_ref and str(lon_ref).strip().upper().startswith("W"):
lon = -lon
result["gps_lat"] = round(lat, 7)
result["gps_lon"] = round(lon, 7)
alt_tag = raw_tags.get("GPS GPSAltitude")
alt_ref = raw_tags.get("GPS GPSAltitudeRef")
if alt_tag and alt_tag.values:
alt = _rational_to_float(alt_tag.values[0])
if alt_ref and alt_ref.values and alt_ref.values[0] == 1:
alt = -alt
result["gps_alt"] = round(alt, 2)
except Exception as e:
result["gps_error"] = str(e)
return result
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_serializable(obj):
if hasattr(obj, "numerator") and hasattr(obj, "denominator"):
try:
return float(obj)
except Exception:
return str(obj)
if isinstance(obj, dict):
return {str(k): _make_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_make_serializable(x) for x in obj]
if isinstance(obj, bytes):
return obj[:200].decode("utf-8", errors="replace")
try:
json.dumps(obj)
return obj
except (TypeError, ValueError):
return str(obj)
def file_hash_sha256(path: Path, chunk: int = 65536) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while data := f.read(chunk):
h.update(data)
return h.hexdigest()
def clean_nullbytes(obj):
if isinstance(obj, str):
return obj.replace("\x00", "")
if isinstance(obj, dict):
return {k: clean_nullbytes(v) for k, v in obj.items()}
if isinstance(obj, list):
return [clean_nullbytes(v) for v in obj]
return obj
def hex_to_int64(hex_str) -> Optional[int]:
if not hex_str:
return None
try:
unsigned = int(str(hex_str).strip(), 16) & 0xFFFFFFFFFFFFFFFF
return struct.unpack("q", struct.pack("Q", unsigned))[0]
except Exception:
return None
def parse_fraction(s) -> Optional[float]:
if s is None:
return None
try:
s = str(s).strip()
if "/" in s:
num, den = s.split("/", 1)
d = float(den)
return float(num) / d if d != 0 else None
return float(s)
except Exception:
return None
def parse_exif_datetime(dt_str, offset_str=None) -> Optional[datetime]:
if not dt_str:
return None
try:
s = str(dt_str).strip()
date_part = s[:10].replace(":", "-")
time_part = s[11:19] if len(s) >= 19 else "00:00:00"
dt = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
if offset_str:
m = re.match(r"([+-])(\d{2}):(\d{2})", str(offset_str).strip())
if m:
sign = 1 if m.group(1) == "+" else -1
tz = timezone(timedelta(hours=sign * int(m.group(2)),
minutes=sign * int(m.group(3))))
return dt.replace(tzinfo=tz)
return dt.replace(tzinfo=timezone.utc)
except Exception:
return None
def parse_iso(raw) -> Optional[int]:
if raw is None:
return None
try:
return int(float(str(raw).strip().strip("[]").split(",")[0].strip()))
except Exception:
return None
# ---------------------------------------------------------------------------
# IPTC + XMP
# ---------------------------------------------------------------------------
def _parse_iptc(raw_iptc) -> dict:
if not raw_iptc:
return {}
out = {}
for key, value in raw_iptc.items():
name = IPTC_TAG_NAMES.get(key, f"IPTC_{key[0]}_{key[1]}")
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
elif isinstance(value, list):
value = [v.decode("utf-8", errors="replace") if isinstance(v, bytes) else v for v in value]
out[name] = value
return out
XMP_PATTERNS = {
"creator_tool": r'xmp:CreatorTool="([^"]+)"',
"create_date": r'xmp:CreateDate="([^"]+)"',
"modify_date": r'xmp:ModifyDate="([^"]+)"',
"rating": r'xmp:Rating="([^"]+)"',
"title": r'<dc:title[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"description": r'<dc:description[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"creator": r'<dc:creator[^>]*>.*?<rdf:li[^>]*>([^<]+)</rdf:li>',
"subject_block": r'<dc:subject[^>]*>(.*?)</dc:subject>',
}
def _parse_xmp(xmp_raw) -> dict:
if not xmp_raw:
return {}
if isinstance(xmp_raw, bytes):
xmp_raw = xmp_raw.decode("utf-8", errors="replace")
out = {}
for name, pat in XMP_PATTERNS.items():
m = re.search(pat, xmp_raw, re.DOTALL)
if m:
out[name] = m.group(1).strip()
if "subject_block" in out:
kws = re.findall(r"<rdf:li[^>]*>([^<]+)</rdf:li>", out.pop("subject_block"))
if kws:
out["keywords"] = kws
face_count = len(re.findall(r'mwg-rs:Type="Face"', xmp_raw))
if face_count:
out["face_regions_count"] = face_count
face_names = re.findall(r'mwg-rs:Name="([^"]+)"', xmp_raw)
if face_names:
out["face_names"] = face_names
out["_xmp_bytes"] = len(xmp_raw)
return out
# ---------------------------------------------------------------------------
# Sběr metadat jedné fotky
# ---------------------------------------------------------------------------
def collect_photo(path: Path) -> dict:
"""Vrátí dict se všemi daty o jedné fotce. Nikdy nevyhodí výjimku."""
record: dict = {}
stat = path.stat()
record["file_path"] = str(path)
record["file_name"] = path.name
record["file_stem"] = path.stem
record["file_ext"] = path.suffix.lower()
record["file_size"] = stat.st_size
record["mtime"] = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
record["mtime_ts"] = stat.st_mtime
record["ctime"] = datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc).isoformat()
try:
record["sha256_file"] = file_hash_sha256(path)
except Exception as e:
record["sha256_file"] = None
record["sha256_file_error"] = str(e)
# EXIF + GPS
record["exif"] = {}
try:
with open(path, "rb") as f:
raw_tags = exifread.process_file(f, details=True)
record.update(extract_gps(raw_tags))
for k, v in raw_tags.items():
if "Thumbnail" in k and "JPEGInterchangeFormat" not in k:
continue
record["exif"][k] = str(v)
except Exception as e:
record["exif_error"] = str(e)
# Pillow: rozměry, hashe, IPTC, XMP
try:
with Image.open(path) as img:
record["format"] = img.format
record["mode"] = img.mode
record["width"] = img.width
record["height"] = img.height
record["megapixels"] = round((img.width * img.height) / 1_000_000, 2)
record["has_transparency"] = img.mode in ("RGBA", "LA") or "transparency" in img.info
dpi = img.info.get("dpi")
record["dpi"] = list(dpi) if isinstance(dpi, tuple) else dpi
record["icc_profile"] = "icc_profile" in img.info
record["embedded_thumbnail"] = "thumbnail" in img.info
try:
record["iptc"] = _parse_iptc(IptcImagePlugin.getiptcinfo(img))
except Exception as e:
record["iptc"] = {"_error": str(e)}
record["xmp"] = _parse_xmp(img.info.get("xmp"))
try:
img_r = ImageOps.exif_transpose(img)
if img_r.mode != "RGB":
img_r = img_r.convert("RGB")
pixels = img_r.tobytes()
record["sha256_pixels"] = hashlib.sha256(pixels).hexdigest()
ph = imagehash.phash(img_r)
dh = imagehash.dhash(img_r)
record["phash"] = str(ph)
record["dhash"] = str(dh)
record["phash_int"] = hex_to_int64(str(ph))
record["dhash_int"] = hex_to_int64(str(dh))
except Exception as e:
record["pixel_hash_error"] = str(e)
record["sha256_pixels"] = None
record["phash"] = record["dhash"] = None
record["phash_int"] = record["dhash_int"] = None
except Exception as e:
record["pil_error"] = str(e)
record["collected_at"] = datetime.now(tz=timezone.utc).isoformat()
return record
# ---------------------------------------------------------------------------
# Mapování záznamu → DB sloupce
# ---------------------------------------------------------------------------
COLUMNS = [
"zaloha_id",
"sha256_file", "sha256_pixels", "phash", "dhash",
"file_path", "file_name", "file_stem", "file_ext",
"file_size", "mime_type", "format", "mode", "width", "height", "megapixels",
"has_transparency", "icc_profile", "embedded_thumbnail",
"taken_at", "taken_at_source", "mtime", "collected_at",
"camera_make", "camera_model", "lens_model",
"iso", "aperture", "exposure_time", "focal_length_mm",
"gps_lat", "gps_lon", "gps_altitude",
"is_screenshot", "face_count",
"exif_raw", "iptc_raw", "xmp_raw",
]
INSERT_SQL = f"""
INSERT INTO photos ({", ".join(COLUMNS)})
VALUES %s
ON CONFLICT (sha256_file) DO NOTHING
"""
def extract_fields(zaloha_id: int, rec: dict) -> Optional[tuple]:
"""Vrátí tuple pro INSERT nebo None pokud sha256_file chybí (soubor nečitelný)."""
if not rec.get("sha256_file"):
return None
exif = clean_nullbytes(rec.get("exif") or {})
iptc = clean_nullbytes(rec.get("iptc") or {})
xmp = clean_nullbytes(rec.get("xmp") or {})
# taken_at — z EXIF, fallback na mtime
dt_orig = exif.get("EXIF DateTimeOriginal") or exif.get("Image DateTime")
dt_offset = exif.get("EXIF OffsetTimeOriginal") or exif.get("EXIF OffsetTime")
taken_at = parse_exif_datetime(dt_orig, dt_offset)
taken_at_source = "exif" if taken_at else None
mtime = None
if rec.get("mtime"):
try:
mtime = datetime.fromisoformat(rec["mtime"])
except Exception:
pass
if not taken_at and mtime:
taken_at = mtime
taken_at_source = "mtime"
collected_at = None
if rec.get("collected_at"):
try:
collected_at = datetime.fromisoformat(rec["collected_at"])
except Exception:
pass
camera_make = (str(exif.get("Image Make", "") or "").strip()) or None
camera_model = (str(exif.get("Image Model", "") or "").strip()) or None
lens_model = (str(exif.get("EXIF LensModel", "") or "").strip()) or None
iso = parse_iso(exif.get("EXIF ISOSpeedRatings"))
_ap = parse_fraction(exif.get("EXIF FNumber"))
aperture = round(_ap, 2) if _ap is not None else None
exposure_raw = exif.get("EXIF ExposureTime")
exposure_time = str(exposure_raw).strip() if exposure_raw else None
_fl_raw = exif.get("EXIF FocalLength")
_fl = parse_fraction(str(_fl_raw).split()[0]) if _fl_raw else None
focal_length_mm = round(_fl, 2) if _fl is not None else None
# GPS — použijeme předpočítané hodnoty z collect_photo
gps_lat = rec.get("gps_lat")
gps_lon = rec.get("gps_lon")
gps_altitude = rec.get("gps_alt")
xmp_desc = str(xmp.get("description") or "").lower()
is_screenshot = "screenshot" in xmp_desc
face_count = None
if "face_regions_count" in xmp:
try:
face_count = int(xmp["face_regions_count"])
except Exception:
pass
fmt = (rec.get("format") or "").strip()
mime_type = MIME_MAP.get(fmt.upper(), f"image/{fmt.lower()}" if fmt else None)
fields = {
"zaloha_id": zaloha_id,
"sha256_file": rec["sha256_file"],
"sha256_pixels": rec.get("sha256_pixels"),
"phash": rec.get("phash_int"),
"dhash": rec.get("dhash_int"),
"file_path": rec.get("file_path", ""),
"file_name": rec.get("file_name", ""),
"file_stem": rec.get("file_stem"),
"file_ext": (rec.get("file_ext") or "").lower().strip() or None,
"file_size": int(rec["file_size"]) if rec.get("file_size") else None,
"mime_type": mime_type,
"format": fmt or None,
"mode": rec.get("mode"),
"width": int(rec["width"]) if rec.get("width") else None,
"height": int(rec["height"]) if rec.get("height") else None,
"megapixels": rec.get("megapixels"),
"has_transparency": bool(rec.get("has_transparency")),
"icc_profile": bool(rec.get("icc_profile")),
"embedded_thumbnail": bool(rec.get("embedded_thumbnail")),
"taken_at": taken_at,
"taken_at_source": taken_at_source,
"mtime": mtime,
"collected_at": collected_at,
"camera_make": camera_make,
"camera_model": camera_model,
"lens_model": lens_model,
"iso": iso,
"aperture": aperture,
"exposure_time": exposure_time,
"focal_length_mm": focal_length_mm,
"gps_lat": gps_lat,
"gps_lon": gps_lon,
"gps_altitude": gps_altitude,
"is_screenshot": is_screenshot,
"face_count": face_count,
"exif_raw": json.dumps(_make_serializable(exif), ensure_ascii=False) if exif else None,
"iptc_raw": json.dumps(iptc, ensure_ascii=False) if iptc else None,
"xmp_raw": json.dumps(xmp, ensure_ascii=False) if xmp else None,
}
return tuple(fields[c] for c in COLUMNS)
# ---------------------------------------------------------------------------
# Progress
# ---------------------------------------------------------------------------
class Progress:
def __init__(self, total: int):
self.total = total
self.done = 0
self.errors = 0
self.skipped = 0
self.start = time.monotonic()
def tick(self, ok: bool = True):
if ok:
self.done += 1
else:
self.errors += 1
def skip(self):
self.skipped += 1
def report(self, current_file: str = "") -> str:
elapsed = time.monotonic() - self.start
processed = self.done + self.errors + self.skipped
rate = processed / elapsed if elapsed > 0 else 0
eta_str = ""
if self.total and rate > 0:
remaining = (self.total - processed) / rate
h, r = divmod(int(remaining), 3600)
m, s = divmod(r, 60)
eta_str = f" ETA {h:02d}:{m:02d}:{s:02d}"
pct = f" ({100 * processed / self.total:.1f}%)" if self.total else ""
name = Path(current_file).name[:40] if current_file else ""
return (
f"\r {processed}/{self.total}{pct}"
f" ok={self.done} err={self.errors} skip={self.skipped}"
f" {rate:.1f} f/s{eta_str} {name:<40}"
)
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
_shutdown = False
def _handle_sigint(sig, frame):
global _shutdown
print("\n\n[!] Preruseno — dokoncuji probihaici davku...")
_shutdown = True
# ---------------------------------------------------------------------------
# DB — načtení nezpracovaných záznamů
# ---------------------------------------------------------------------------
def count_unprocessed(conn) -> int:
with conn.cursor() as cur:
cur.execute("""
SELECT COUNT(*) FROM zaloha_obrazku z
LEFT JOIN photos p ON p.zaloha_id = z.id
WHERE p.id IS NULL
""")
return cur.fetchone()[0]
def iter_unprocessed(conn, limit: int = 0):
"""Generátor: streamuje nezpracované záznamy ze zaloha_obrazku."""
sql = """
SELECT z.id, z.cesta_zalohy, z.nazev_souboru
FROM zaloha_obrazku z
LEFT JOIN photos p ON p.zaloha_id = z.id
WHERE p.id IS NULL
ORDER BY z.id
"""
if limit:
sql += f" LIMIT {limit}"
# Named cursor pro serverové streamování (nenahrává vše do RAM)
with conn.cursor("unprocessed_cursor") as cur:
cur.itersize = 2000
cur.execute(sql)
for row in cur:
yield row # (id, cesta_zalohy, nazev_souboru)
# ---------------------------------------------------------------------------
# DB — flush dávky
# ---------------------------------------------------------------------------
def flush_batch(conn, batch: list) -> int:
"""Vloží dávku do DB, commitne. Vrátí počet skutečně vložených řádků."""
if not batch:
return 0
with conn.cursor() as cur:
execute_values(cur, INSERT_SQL, batch)
conn.commit()
return len(batch)
# ---------------------------------------------------------------------------
# Zpracování jednoho záznamu
# ---------------------------------------------------------------------------
def process_one(zaloha_id: int, linux_path: str) -> tuple[Optional[tuple], str]:
"""
Vrátí (db_row, error_msg).
db_row je None pokud soubor nejde číst nebo chybí sha256.
"""
ext = Path(linux_path).suffix.lower()
if ext not in SUPPORTED_EXTENSIONS:
return None, f"nepodporovana pripona: {ext}"
local_path = db_path_to_local(linux_path)
try:
rec = collect_photo(local_path)
except Exception as e:
return None, str(e)
if not rec.get("sha256_file"):
err = rec.get("sha256_file_error") or rec.get("pil_error") or "sha256 chybi"
return None, err
try:
row = extract_fields(zaloha_id, rec)
except Exception as e:
return None, f"extract_fields: {e}"
return row, ""
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main():
import socket
hostname = socket.gethostname().lower()
if hostname == "tower":
print("[ERROR] Tento skript nesmí běžet na 'tower'. Spusť ho na Tower1 nebo Windows.")
sys.exit(1)
parser = argparse.ArgumentParser(
description="collect_and_import.py — Sběr metadat ze zálohy a import do DB"
)
parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS,
help=f"Pocet paralelnich vlaken (default: {DEFAULT_WORKERS})")
parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE,
help=f"Velikost davky pro INSERT (default: {DEFAULT_BATCH_SIZE})")
parser.add_argument("--limit", type=int, default=0,
help="Zpracovat maximalne N fotek (0 = vse)")
parser.add_argument("--dry-run", action="store_true",
help="Jen spocita nezpracovane zaznamy, nic neimportuje")
args = parser.parse_args()
print("[collect_and_import]")
print(f" DB: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}")
print(f" Workers: {args.workers}")
print(f" Batch size: {args.batch_size}")
print(f" Limit: {args.limit or 'vse'}")
print()
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
print(" Pocitam nezpracovane zaznamy...", end=" ", flush=True)
total = count_unprocessed(conn)
effective_total = min(total, args.limit) if args.limit else total
print(f"{total} (zpracovano bude: {effective_total})")
if args.dry_run:
conn.close()
return
if total == 0:
print(" Vse jiz zpracovano.")
conn.close()
return
signal.signal(signal.SIGINT, _handle_sigint)
progress = Progress(total=effective_total)
error_log_path = Path(__file__).parent / "output" / "collect_errors.log"
error_log_path.parent.mkdir(parents=True, exist_ok=True)
err_f = open(error_log_path, "a", encoding="utf-8", buffering=1)
err_f.write(f"\n--- Session {datetime.now().isoformat()} ---\n")
batch: list[tuple] = []
def handle_result(zaloha_id: int, linux_path: str, row: Optional[tuple], err: str):
nonlocal batch
if row is None:
if err:
progress.tick(ok=False)
err_f.write(f"{zaloha_id}\t{linux_path}\t{err}\n")
else:
progress.skip()
else:
progress.tick(ok=True)
batch.append(row)
if len(batch) >= args.batch_size:
flush_batch(conn, batch)
batch = []
print(progress.report(linux_path), end="", flush=True)
print(f"\n Zpracovavam...\n")
# Samostatné DB spojení pro čtení (generátor) a zápis (conn) nemohou sdílet transakci
conn_read = psycopg2.connect(**DB_CONFIG)
conn_read.autocommit = False # named cursor potřebuje transakci
if args.workers > 1:
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = {}
for zid, cesta, _ in iter_unprocessed(conn_read, args.limit):
if _shutdown:
pool.shutdown(wait=False, cancel_futures=True)
break
f = pool.submit(process_one, zid, cesta)
futures[f] = (zid, cesta)
for future in as_completed(futures):
if _shutdown:
break
zid, cesta = futures[future]
try:
row, err = future.result()
except Exception as e:
row, err = None, str(e)
handle_result(zid, cesta, row, err)
else:
for zid, cesta, _ in iter_unprocessed(conn_read, args.limit):
if _shutdown:
break
row, err = process_one(zid, cesta)
handle_result(zid, cesta, row, err)
# Flush zbytku
if batch:
flush_batch(conn, batch)
conn_read.close()
conn.close()
err_f.close()
elapsed = time.monotonic() - progress.start
print(f"\n\n{'='*60}")
print(f" Dokonceno za {elapsed:.1f}s")
print(f" Ulozeno: {progress.done}")
print(f" Preskoceno: {progress.skipped} (nepodporovana pripona)")
print(f" Chyby: {progress.errors}")
if progress.errors:
print(f" Chybovy log: {error_log_path}")
print(f"{'='*60}")
if __name__ == "__main__":
main()