#!/usr/bin/env python3 """ FotkyBuzalkovi MCP Server Poskytuje nástroje pro dotazování PostgreSQL databáze fotky_buzalkovi. """ import json import os import sys from pathlib import Path import psycopg2 import psycopg2.extras from dotenv import load_dotenv from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent # Načtení .env ze stejného adresáře jako tento skript load_dotenv(Path(__file__).parent / ".env") DB_CONFIG = { "host": os.getenv("DB_HOST", "192.168.1.76"), "port": int(os.getenv("DB_PORT", 5432)), "user": os.getenv("DB_USER", "vladimir.buzalka"), "password": os.getenv("DB_PASSWORD", ""), "dbname": os.getenv("DB_NAME", "fotky_buzalkovi"), } def get_conn(): return psycopg2.connect(**DB_CONFIG) def run_query(sql: str, params=None, limit: int = 500): """Spustí SELECT dotaz a vrátí výsledek jako seznam diktů.""" with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params) rows = cur.fetchmany(limit) return [dict(r) for r in rows], cur.description # --------------------------------------------------------------------------- # Server # --------------------------------------------------------------------------- server = Server("fotky-buzalkovi") @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query", description=( "Spustí libovolný SELECT dotaz na databázi fotky_buzalkovi. " "Vrátí max. 500 řádků. Používej pro průzkum dat." ), inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "SELECT dotaz (jen čtení, INSERT/UPDATE/DELETE nejsou povoleny)", }, "limit": { "type": "integer", "description": "Max. počet vrácených řádků (default 100, max 500)", "default": 100, }, }, "required": ["sql"], }, ), Tool( name="tables", description="Vrátí seznam všech tabulek v databázi s počty řádků.", inputSchema={"type": "object", "properties": {}}, ), Tool( name="describe_table", description="Vrátí strukturu tabulky — sloupce, typy, nullable, default.", inputSchema={ "type": "object", "properties": { "table": {"type": "string", "description": "Název tabulky"}, }, "required": ["table"], }, ), Tool( name="stats", description=( "Základní statistiky projektu: počty fotek, stav importu, " "přehled kamer, roky pořízení, chybějící data." ), inputSchema={"type": "object", "properties": {}}, ), ] @server.call_tool() async def call_tool(name: str, arguments: dict): # Ochrana — jen SELECT def check_readonly(sql: str): normalized = sql.strip().upper() for bad in ("INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"): if normalized.startswith(bad) or f"\n{bad}" in normalized: raise ValueError(f"Pouze SELECT dotazy jsou povoleny. Nalezeno: {bad}") try: if name == "query": sql = arguments["sql"] check_readonly(sql) limit = min(int(arguments.get("limit", 100)), 500) rows, desc = run_query(sql, limit=limit) result = json.dumps(rows, ensure_ascii=False, default=str, indent=2) return [TextContent(type="text", text=result)] elif name == "tables": sql = """ SELECT t.table_name, c.reltuples::bigint AS est_rows FROM information_schema.tables t JOIN pg_class c ON c.relname = t.table_name WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' ORDER BY t.table_name """ rows, _ = run_query(sql, limit=100) return [TextContent(type="text", text=json.dumps(rows, ensure_ascii=False, default=str, indent=2))] elif name == "describe_table": table = arguments["table"] sql = """ SELECT column_name, data_type, character_maximum_length, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """ rows, _ = run_query(sql, params=(table,), limit=200) if not rows: return [TextContent(type="text", text=f"Tabulka '{table}' nenalezena.")] return [TextContent(type="text", text=json.dumps(rows, ensure_ascii=False, default=str, indent=2))] elif name == "stats": results = {} with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT COUNT(*) AS total FROM zaloha_obrazku") results["zaloha_obrazku_total"] = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) AS total FROM zdrojove_soubory") results["zdrojove_soubory_total"] = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) AS total FROM photos") results["photos_total"] = cur.fetchone()["total"] cur.execute(""" SELECT COUNT(*) AS total FROM photos WHERE taken_at IS NOT NULL """) results["photos_with_taken_at"] = cur.fetchone()["total"] cur.execute(""" SELECT COUNT(*) AS total FROM photos WHERE gps_lat IS NOT NULL """) results["photos_with_gps"] = cur.fetchone()["total"] cur.execute(""" SELECT camera_model, COUNT(*) AS cnt FROM photos WHERE camera_model IS NOT NULL GROUP BY camera_model ORDER BY cnt DESC LIMIT 10 """) results["top_cameras"] = [dict(r) for r in cur.fetchall()] cur.execute(""" SELECT EXTRACT(YEAR FROM taken_at)::int AS rok, COUNT(*) AS cnt FROM photos WHERE taken_at IS NOT NULL GROUP BY rok ORDER BY rok """) results["photos_by_year"] = [dict(r) for r in cur.fetchall()] cur.execute(""" SELECT processing_status, COUNT(*) AS cnt FROM photos GROUP BY processing_status ORDER BY cnt DESC """) results["processing_status"] = [dict(r) for r in cur.fetchall()] return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, default=str, indent=2))] else: return [TextContent(type="text", text=f"Neznámý nástroj: {name}")] except Exception as e: return [TextContent(type="text", text=f"Chyba: {e}")] # --------------------------------------------------------------------------- # Spuštění # --------------------------------------------------------------------------- async def main(): async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options(), ) if __name__ == "__main__": import asyncio asyncio.run(main())