diff --git a/.env b/.env index 6c71931..c06a3cb 100644 --- a/.env +++ b/.env @@ -3,3 +3,13 @@ DB_PORT=5432 DB_USER=vladimir.buzalka DB_PASSWORD=Vlado7309208104++ DB_NAME=fotky_buzalkovi + +IMMICH_URL=http://192.168.1.76:8888 +IMMICH_API_KEY=UQV5PS1Td50hKOZTItnXEcXVkfQSUxcUH0XHZYxc + +SSH_HOST=192.168.1.76 +SSH_USER=root +SSH_PASSWORD=7309208104 +# Cesta zálohy v DB (Tower1 local) -> NFS mount na tower +ZALOHA_SRC_PREFIX=/mnt/user/ZalohaVsechObrazku +ZALOHA_DST_PREFIX=/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku diff --git a/mcp_server.py b/mcp_server.py index 4ea7ed4..52bcd6d 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -1,14 +1,19 @@ #!/usr/bin/env python3 """ FotkyBuzalkovi MCP Server -Poskytuje nástroje pro dotazování PostgreSQL databáze fotky_buzalkovi. +Poskytuje nástroje pro dotazování PostgreSQL databáze fotky_buzalkovi +a nahrávání fotek do Immich. """ +import asyncio +import io import json import os import sys +import urllib.request from pathlib import Path +import paramiko import psycopg2 import psycopg2.extras from dotenv import load_dotenv @@ -27,6 +32,19 @@ DB_CONFIG = { "dbname": os.getenv("DB_NAME", "fotky_buzalkovi"), } +IMMICH_URL = os.getenv("IMMICH_URL", "http://192.168.1.76:8888") +IMMICH_API_KEY = os.getenv("IMMICH_API_KEY", "") + +SSH_HOST = os.getenv("SSH_HOST", "192.168.1.76") +SSH_USER = os.getenv("SSH_USER", "root") +SSH_PASSWORD = os.getenv("SSH_PASSWORD", "") +ZALOHA_SRC_PREFIX = os.getenv("ZALOHA_SRC_PREFIX", "/mnt/user/ZalohaVsechObrazku") +ZALOHA_DST_PREFIX = os.getenv("ZALOHA_DST_PREFIX", "/mnt/remotes/TOWER1.LAN_ZalohaVsechObrazku") + + +# --------------------------------------------------------------------------- +# DB helpers +# --------------------------------------------------------------------------- def get_conn(): return psycopg2.connect(**DB_CONFIG) @@ -41,6 +59,133 @@ def run_query(sql: str, params=None, limit: int = 500): return [dict(r) for r in rows], cur.description +def build_pending_query(camera_model=None, category=None, hostname=None, + date_from=None, date_to=None, limit=20): + """Sestaví SQL dotaz pro nenahrané fotky.""" + sql = """ + SELECT + z.id AS zaloha_id, + z.cesta_zalohy, + z.nazev_souboru, + p.camera_make, + p.camera_model, + p.taken_at, + p.category, + p.file_size + FROM photos p + JOIN zaloha_obrazku z ON p.zaloha_id = z.id + LEFT JOIN immich_upload iu ON iu.zaloha_id = z.id + WHERE iu.zaloha_id IS NULL + """ + params = [] + if camera_model: + sql += " AND p.camera_model ILIKE %s" + params.append(f"%{camera_model}%") + if category: + sql += " AND p.category = %s" + params.append(category) + if hostname: + sql += " AND z.cesta_zalohy LIKE %s" + params.append(f"/mnt/user/ZalohaVsechObrazku/{hostname}/%") + if date_from: + sql += " AND p.taken_at >= %s" + params.append(date_from) + if date_to: + sql += " AND p.taken_at <= %s" + params.append(date_to) + sql += " ORDER BY p.taken_at NULLS LAST, z.id" + sql += f" LIMIT {int(limit)}" + return sql, params + + +# --------------------------------------------------------------------------- +# Immich upload (synchronous — volá se přes asyncio.to_thread) +# --------------------------------------------------------------------------- + +def _do_upload(rows: list[dict]) -> dict: + """ + Nahraje soubory z rows do Immich přes SFTP + HTTP multipart. + rows: seznam diktů se zaloha_id, cesta_zalohy, nazev_souboru, taken_at + Vrátí {'created': n, 'duplicate': n, 'error': n, 'details': [...]} + """ + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(SSH_HOST, username=SSH_USER, password=SSH_PASSWORD) + sftp = ssh.open_sftp() + + conn = psycopg2.connect(**DB_CONFIG) + cur = conn.cursor() + + created = dup = err = 0 + details = [] + + for row in rows: + zid = row["zaloha_id"] + fname = row["nazev_souboru"] + cesta = row["cesta_zalohy"] + taken_at = row.get("taken_at") + nfs_path = cesta.replace(ZALOHA_SRC_PREFIX, ZALOHA_DST_PREFIX, 1) + + try: + with sftp.open(nfs_path, "rb") as f: + data = f.read() + + fcreated = taken_at.isoformat() if taken_at else "2000-01-01T00:00:00+00:00" + + boundary = "fotky_buzalkovi_boundary" + body = ( + f"--{boundary}\r\nContent-Disposition: form-data; name=\"deviceAssetId\"\r\n\r\nfb-{zid}\r\n" + f"--{boundary}\r\nContent-Disposition: form-data; name=\"deviceId\"\r\n\r\nfotky-buzalkovi\r\n" + f"--{boundary}\r\nContent-Disposition: form-data; name=\"fileCreatedAt\"\r\n\r\n{fcreated}\r\n" + f"--{boundary}\r\nContent-Disposition: form-data; name=\"fileModifiedAt\"\r\n\r\n{fcreated}\r\n" + f"--{boundary}\r\nContent-Disposition: form-data; name=\"isFavorite\"\r\n\r\nfalse\r\n" + f"--{boundary}\r\nContent-Disposition: form-data; name=\"assetData\"; filename=\"{fname}\"\r\nContent-Type: application/octet-stream\r\n\r\n" + ).encode() + data + f"\r\n--{boundary}--\r\n".encode() + + req = urllib.request.Request( + f"{IMMICH_URL}/api/assets", + data=body, + headers={ + "x-api-key": IMMICH_API_KEY, + "Content-Type": f"multipart/form-data; boundary={boundary}", + }, + method="POST", + ) + with urllib.request.urlopen(req, timeout=300) as r: + resp = json.load(r) + + status = resp.get("status", "error") + aid = resp.get("id") + + cur.execute( + """INSERT INTO immich_upload(zaloha_id, immich_id, status, uploaded_at) + VALUES (%s, %s, %s, now()) + ON CONFLICT (zaloha_id) DO UPDATE + SET immich_id=EXCLUDED.immich_id, status=EXCLUDED.status, uploaded_at=now()""", + (zid, aid, status), + ) + conn.commit() + + if status == "created": + created += 1 + elif status == "duplicate": + dup += 1 + else: + err += 1 + + details.append({"file": fname, "status": status, "immich_id": str(aid)}) + + except Exception as e: + err += 1 + details.append({"file": fname, "status": "error", "error": str(e)}) + + sftp.close() + ssh.close() + conn.close() + + return {"created": created, "duplicate": dup, "error": err, "details": details} + + # --------------------------------------------------------------------------- # Server # --------------------------------------------------------------------------- @@ -97,13 +242,89 @@ async def list_tools() -> list[Tool]: ), inputSchema={"type": "object", "properties": {}}, ), + Tool( + name="immich_find_pending", + description=( + "Najde fotky, které ještě nebyly nahrány do Immich. " + "Lze filtrovat podle kamery, kategorie, hostname zdroje a rozsahu data. " + "Vrátí seznam čekajících fotek s cestami a metadaty." + ), + inputSchema={ + "type": "object", + "properties": { + "camera_model": { + "type": "string", + "description": "Filtr na model fotoaparátu/telefonu (ILIKE, např. 'iPhone 16')", + }, + "category": { + "type": "string", + "description": "Filtr na kategorii fotky (např. 'rodina', 'dovolena')", + }, + "hostname": { + "type": "string", + "description": "Filtr na hostname zdroje (např. 'TW22', 'tower')", + }, + "date_from": { + "type": "string", + "description": "Datum od (ISO formát, např. '2025-01-01')", + }, + "date_to": { + "type": "string", + "description": "Datum do (ISO formát, např. '2025-12-31')", + }, + "limit": { + "type": "integer", + "description": "Max. počet vrácených řádků (default 20, max 500)", + "default": 20, + }, + }, + }, + ), + Tool( + name="immich_upload", + description=( + "Nahraje fotky do Immich a zapíše výsledek do tabulky immich_upload. " + "Přeskočí fotky, které už jsou nahrané (resumable). " + "Lze filtrovat stejně jako immich_find_pending. " + "POZOR: skutečně nahrává soubory — ověř kritéria nejdřív přes immich_find_pending." + ), + inputSchema={ + "type": "object", + "properties": { + "camera_model": { + "type": "string", + "description": "Filtr na model fotoaparátu/telefonu (ILIKE, např. 'iPhone 16')", + }, + "category": { + "type": "string", + "description": "Filtr na kategorii fotky", + }, + "hostname": { + "type": "string", + "description": "Filtr na hostname zdroje (např. 'TW22')", + }, + "date_from": { + "type": "string", + "description": "Datum od (ISO formát)", + }, + "date_to": { + "type": "string", + "description": "Datum do (ISO formát)", + }, + "limit": { + "type": "integer", + "description": "Kolik fotek nahrát v tomto běhu (default 10, max 200)", + "default": 10, + }, + }, + }, + ), ] @server.call_tool() async def call_tool(name: str, arguments: dict): - # Ochrana — jen SELECT def check_readonly(sql: str): normalized = sql.strip().upper() for bad in ("INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"): @@ -111,6 +332,10 @@ async def call_tool(name: str, arguments: dict): raise ValueError(f"Pouze SELECT dotazy jsou povoleny. Nalezeno: {bad}") try: + # ------------------------------------------------------------------ # + # Původní read-only nástroje # + # ------------------------------------------------------------------ # + if name == "query": sql = arguments["sql"] check_readonly(sql) @@ -166,47 +391,97 @@ async def call_tool(name: str, arguments: dict): cur.execute("SELECT COUNT(*) AS total FROM photos") results["photos_total"] = cur.fetchone()["total"] - cur.execute(""" - SELECT COUNT(*) AS total FROM photos - WHERE taken_at IS NOT NULL - """) + cur.execute("SELECT COUNT(*) AS total FROM photos WHERE taken_at IS NOT NULL") results["photos_with_taken_at"] = cur.fetchone()["total"] - cur.execute(""" - SELECT COUNT(*) AS total FROM photos - WHERE gps_lat IS NOT NULL - """) + cur.execute("SELECT COUNT(*) AS total FROM photos WHERE gps_lat IS NOT NULL") results["photos_with_gps"] = cur.fetchone()["total"] + cur.execute("SELECT COUNT(*) AS total FROM immich_upload") + results["immich_uploaded_total"] = cur.fetchone()["total"] + + cur.execute("SELECT status, COUNT(*) AS cnt FROM immich_upload GROUP BY status ORDER BY cnt DESC") + results["immich_upload_by_status"] = [dict(r) for r in cur.fetchall()] + cur.execute(""" SELECT camera_model, COUNT(*) AS cnt - FROM photos - WHERE camera_model IS NOT NULL - GROUP BY camera_model - ORDER BY cnt DESC - LIMIT 10 + FROM photos WHERE camera_model IS NOT NULL + GROUP BY camera_model ORDER BY cnt DESC LIMIT 10 """) results["top_cameras"] = [dict(r) for r in cur.fetchall()] cur.execute(""" SELECT EXTRACT(YEAR FROM taken_at)::int AS rok, COUNT(*) AS cnt - FROM photos - WHERE taken_at IS NOT NULL - GROUP BY rok - ORDER BY rok + FROM photos WHERE taken_at IS NOT NULL + GROUP BY rok ORDER BY rok """) results["photos_by_year"] = [dict(r) for r in cur.fetchall()] cur.execute(""" SELECT processing_status, COUNT(*) AS cnt - FROM photos - GROUP BY processing_status - ORDER BY cnt DESC + FROM photos GROUP BY processing_status ORDER BY cnt DESC """) results["processing_status"] = [dict(r) for r in cur.fetchall()] return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, default=str, indent=2))] + # ------------------------------------------------------------------ # + # Nové Immich nástroje # + # ------------------------------------------------------------------ # + + elif name == "immich_find_pending": + limit = min(int(arguments.get("limit", 20)), 500) + sql, params = build_pending_query( + camera_model=arguments.get("camera_model"), + category=arguments.get("category"), + hostname=arguments.get("hostname"), + date_from=arguments.get("date_from"), + date_to=arguments.get("date_to"), + limit=limit, + ) + rows, _ = run_query(sql, params=params, limit=limit) + + # Přidej celkový počet čekajících (bez LIMIT) + count_sql = sql.replace( + f" ORDER BY p.taken_at NULLS LAST, z.id\n LIMIT {limit}", "" + ) + count_sql = f"SELECT COUNT(*) AS total FROM ({sql.rsplit('LIMIT', 1)[0]}) sub" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(count_sql, params) + total = cur.fetchone()[0] + + result = { + "pending_total": total, + "returned": len(rows), + "rows": rows, + } + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, default=str, indent=2))] + + elif name == "immich_upload": + limit = min(int(arguments.get("limit", 10)), 200) + sql, params = build_pending_query( + camera_model=arguments.get("camera_model"), + category=arguments.get("category"), + hostname=arguments.get("hostname"), + date_from=arguments.get("date_from"), + date_to=arguments.get("date_to"), + limit=limit, + ) + rows, _ = run_query(sql, params=params, limit=limit) + if not rows: + return [TextContent(type="text", text="Žádné fotky k nahrání pro zadaná kritéria.")] + + # Spusť upload v threadu (blocking I/O) + result = await asyncio.to_thread(_do_upload, rows) + summary = ( + f"Nahráno: {result['created']} nových, " + f"{result['duplicate']} duplikátů, " + f"{result['error']} chyb.\n\n" + + json.dumps(result["details"], ensure_ascii=False, indent=2) + ) + return [TextContent(type="text", text=summary)] + else: return [TextContent(type="text", text=f"Neznámý nástroj: {name}")] @@ -228,5 +503,4 @@ async def main(): if __name__ == "__main__": - import asyncio asyncio.run(main())