""" PostgreSQL MCP Server pro fotky_buzalkovi Spustit: python server.py """ import json import sys import os import psycopg2 import psycopg2.extras from typing import Any # Vynutit UTF-8 na stdout/stderr (Windows může defaultovat na cp1252) if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") if hasattr(sys.stderr, "reconfigure"): sys.stderr.reconfigure(encoding="utf-8") # --- MCP protocol helpers (stdio transport) --- def send(msg: dict): line = json.dumps(msg, ensure_ascii=False, default=str) sys.stdout.write(line + "\n") sys.stdout.flush() def recv() -> dict: line = sys.stdin.readline() if not line: raise EOFError return json.loads(line) # --- DB connection --- DB = dict( host=os.getenv("DB_HOST", "192.168.1.76"), port=int(os.getenv("DB_PORT", 5432)), user=os.getenv("DB_USER", "vladimir.buzalka"), password=os.getenv("DB_PASSWORD", "Vlado7309208104++"), dbname=os.getenv("DB_NAME", "fotky_buzalkovi"), ) def get_conn(): return psycopg2.connect(**DB) # --- Tool implementations --- def tool_query(sql: str, params: list | None = None, limit: int = 200) -> str: """SELECT dotaz, vrátí JSON.""" sql = sql.strip().rstrip(";") # Bezpečnost: pouze SELECT/EXPLAIN/WITH first = sql.split()[0].upper() if first not in ("SELECT", "EXPLAIN", "WITH", "TABLE", "SHOW"): return "Chyba: tool_query je jen pro SELECT. Pro zápis použij tool_execute." sql = f"SELECT * FROM ({sql}) _q LIMIT {limit}" if first == "SELECT" else sql with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params or []) rows = cur.fetchall() return json.dumps([dict(r) for r in rows], ensure_ascii=False, default=str) def tool_execute(sql: str, params: list | None = None) -> str: """INSERT/UPDATE/DELETE/CREATE — vrátí počet ovlivněných řádků.""" with get_conn() as conn: with conn.cursor() as cur: cur.execute(sql, params or []) rowcount = cur.rowcount conn.commit() return f"OK, ovlivněno řádků: {rowcount}" def tool_list_tables() -> str: """Vrátí seznam tabulek v public schema.""" sql = """ SELECT table_name, pg_size_pretty(pg_total_relation_size(quote_ident(table_name))) AS size FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """ with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql) rows = cur.fetchall() return json.dumps([dict(r) for r in rows], ensure_ascii=False) def tool_describe_table(table: str) -> str: """Vrátí sloupce, typy, nullable, default + indexy dané tabulky.""" cols_sql = """ SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """ idx_sql = """ SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = 'public' AND tablename = %s """ with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(cols_sql, [table]) cols = cur.fetchall() cur.execute(idx_sql, [table]) idxs = cur.fetchall() return json.dumps({ "columns": [dict(r) for r in cols], "indexes": [dict(r) for r in idxs], }, ensure_ascii=False) # --- Tool registry --- TOOLS = [ { "name": "pg_query", "description": "Spustí SELECT dotaz na fotky_buzalkovi a vrátí výsledky jako JSON. Max 200 řádků (nastavitelné parametrem limit).", "inputSchema": { "type": "object", "properties": { "sql": {"type": "string", "description": "SELECT SQL dotaz"}, "params": {"type": "array", "description": "Poziční parametry (%s)", "default": []}, "limit": {"type": "integer","description": "Max počet řádků (default 200)", "default": 200}, }, "required": ["sql"], }, }, { "name": "pg_execute", "description": "Spustí INSERT/UPDATE/DELETE/DDL na fotky_buzalkovi. Vrátí počet ovlivněných řádků.", "inputSchema": { "type": "object", "properties": { "sql": {"type": "string", "description": "SQL příkaz (INSERT/UPDATE/DELETE/CREATE/ALTER)"}, "params": {"type": "array", "description": "Poziční parametry (%s)", "default": []}, }, "required": ["sql"], }, }, { "name": "pg_list_tables", "description": "Vrátí seznam všech tabulek v databázi fotky_buzalkovi s jejich velikostí.", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "pg_describe_table", "description": "Vrátí strukturu (sloupce, typy, indexy) dané tabulky.", "inputSchema": { "type": "object", "properties": { "table": {"type": "string", "description": "Název tabulky"}, }, "required": ["table"], }, }, ] # --- MCP message loop --- def handle(req: dict) -> dict | None: method = req.get("method", "") rid = req.get("id") if method == "initialize": return { "jsonrpc": "2.0", "id": rid, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "postgres-fotky", "version": "1.0.0"}, }, } if method == "notifications/initialized": return None # no response needed if method == "tools/list": return { "jsonrpc": "2.0", "id": rid, "result": {"tools": TOOLS}, } if method == "tools/call": name = req["params"]["name"] args = req["params"].get("arguments", {}) try: if name == "pg_query": result = tool_query(args["sql"], args.get("params"), args.get("limit", 200)) elif name == "pg_execute": result = tool_execute(args["sql"], args.get("params")) elif name == "pg_list_tables": result = tool_list_tables() elif name == "pg_describe_table": result = tool_describe_table(args["table"]) else: result = f"Neznámý tool: {name}" return { "jsonrpc": "2.0", "id": rid, "result": {"content": [{"type": "text", "text": result}]}, } except Exception as e: return { "jsonrpc": "2.0", "id": rid, "result": {"content": [{"type": "text", "text": f"Chyba: {e}"}], "isError": True}, } # Neznámá metoda return { "jsonrpc": "2.0", "id": rid, "error": {"code": -32601, "message": f"Method not found: {method}"}, } if __name__ == "__main__": # Ověření připojení při startu try: with get_conn() as c: pass sys.stderr.write("postgres-fotky MCP: připojeno k DB OK\n") except Exception as e: sys.stderr.write(f"postgres-fotky MCP: VAROVÁNÍ — nelze se připojit k DB: {e}\n") while True: try: req = recv() except EOFError: break resp = handle(req) if resp is not None: send(resp)