227 lines
7.5 KiB
Python
227 lines
7.5 KiB
Python
"""
|
|
PostgreSQL MCP Server pro fotky_buzalkovi
|
|
Spustit: python server.py
|
|
"""
|
|
import json
|
|
import sys
|
|
import os
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from typing import Any
|
|
|
|
# Vynutit UTF-8 na stdout/stderr (Windows může defaultovat na cp1252)
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
if hasattr(sys.stderr, "reconfigure"):
|
|
sys.stderr.reconfigure(encoding="utf-8")
|
|
|
|
# --- MCP protocol helpers (stdio transport) ---
|
|
|
|
def send(msg: dict):
|
|
line = json.dumps(msg, ensure_ascii=False, default=str)
|
|
sys.stdout.write(line + "\n")
|
|
sys.stdout.flush()
|
|
|
|
def recv() -> dict:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
raise EOFError
|
|
return json.loads(line)
|
|
|
|
# --- DB connection ---
|
|
|
|
DB = dict(
|
|
host=os.getenv("DB_HOST", "192.168.1.76"),
|
|
port=int(os.getenv("DB_PORT", 5432)),
|
|
user=os.getenv("DB_USER", "vladimir.buzalka"),
|
|
password=os.getenv("DB_PASSWORD", "Vlado7309208104++"),
|
|
dbname=os.getenv("DB_NAME", "fotky_buzalkovi"),
|
|
)
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(**DB)
|
|
|
|
# --- Tool implementations ---
|
|
|
|
def tool_query(sql: str, params: list | None = None, limit: int = 200) -> str:
|
|
"""SELECT dotaz, vrátí JSON."""
|
|
sql = sql.strip().rstrip(";")
|
|
# Bezpečnost: pouze SELECT/EXPLAIN/WITH
|
|
first = sql.split()[0].upper()
|
|
if first not in ("SELECT", "EXPLAIN", "WITH", "TABLE", "SHOW"):
|
|
return "Chyba: tool_query je jen pro SELECT. Pro zápis použij tool_execute."
|
|
sql = f"SELECT * FROM ({sql}) _q LIMIT {limit}" if first == "SELECT" else sql
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(sql, params or [])
|
|
rows = cur.fetchall()
|
|
return json.dumps([dict(r) for r in rows], ensure_ascii=False, default=str)
|
|
|
|
def tool_execute(sql: str, params: list | None = None) -> str:
|
|
"""INSERT/UPDATE/DELETE/CREATE — vrátí počet ovlivněných řádků."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, params or [])
|
|
rowcount = cur.rowcount
|
|
conn.commit()
|
|
return f"OK, ovlivněno řádků: {rowcount}"
|
|
|
|
def tool_list_tables() -> str:
|
|
"""Vrátí seznam tabulek v public schema."""
|
|
sql = """
|
|
SELECT table_name,
|
|
pg_size_pretty(pg_total_relation_size(quote_ident(table_name))) AS size
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
ORDER BY table_name
|
|
"""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(sql)
|
|
rows = cur.fetchall()
|
|
return json.dumps([dict(r) for r in rows], ensure_ascii=False)
|
|
|
|
def tool_describe_table(table: str) -> str:
|
|
"""Vrátí sloupce, typy, nullable, default + indexy dané tabulky."""
|
|
cols_sql = """
|
|
SELECT column_name, data_type, is_nullable, column_default
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public' AND table_name = %s
|
|
ORDER BY ordinal_position
|
|
"""
|
|
idx_sql = """
|
|
SELECT indexname, indexdef
|
|
FROM pg_indexes
|
|
WHERE schemaname = 'public' AND tablename = %s
|
|
"""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(cols_sql, [table])
|
|
cols = cur.fetchall()
|
|
cur.execute(idx_sql, [table])
|
|
idxs = cur.fetchall()
|
|
return json.dumps({
|
|
"columns": [dict(r) for r in cols],
|
|
"indexes": [dict(r) for r in idxs],
|
|
}, ensure_ascii=False)
|
|
|
|
# --- Tool registry ---
|
|
|
|
TOOLS = [
|
|
{
|
|
"name": "pg_query",
|
|
"description": "Spustí SELECT dotaz na fotky_buzalkovi a vrátí výsledky jako JSON. Max 200 řádků (nastavitelné parametrem limit).",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"sql": {"type": "string", "description": "SELECT SQL dotaz"},
|
|
"params": {"type": "array", "description": "Poziční parametry (%s)", "default": []},
|
|
"limit": {"type": "integer","description": "Max počet řádků (default 200)", "default": 200},
|
|
},
|
|
"required": ["sql"],
|
|
},
|
|
},
|
|
{
|
|
"name": "pg_execute",
|
|
"description": "Spustí INSERT/UPDATE/DELETE/DDL na fotky_buzalkovi. Vrátí počet ovlivněných řádků.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"sql": {"type": "string", "description": "SQL příkaz (INSERT/UPDATE/DELETE/CREATE/ALTER)"},
|
|
"params": {"type": "array", "description": "Poziční parametry (%s)", "default": []},
|
|
},
|
|
"required": ["sql"],
|
|
},
|
|
},
|
|
{
|
|
"name": "pg_list_tables",
|
|
"description": "Vrátí seznam všech tabulek v databázi fotky_buzalkovi s jejich velikostí.",
|
|
"inputSchema": {"type": "object", "properties": {}},
|
|
},
|
|
{
|
|
"name": "pg_describe_table",
|
|
"description": "Vrátí strukturu (sloupce, typy, indexy) dané tabulky.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"table": {"type": "string", "description": "Název tabulky"},
|
|
},
|
|
"required": ["table"],
|
|
},
|
|
},
|
|
]
|
|
|
|
# --- MCP message loop ---
|
|
|
|
def handle(req: dict) -> dict | None:
|
|
method = req.get("method", "")
|
|
rid = req.get("id")
|
|
|
|
if method == "initialize":
|
|
return {
|
|
"jsonrpc": "2.0", "id": rid,
|
|
"result": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {"tools": {}},
|
|
"serverInfo": {"name": "postgres-fotky", "version": "1.0.0"},
|
|
},
|
|
}
|
|
|
|
if method == "notifications/initialized":
|
|
return None # no response needed
|
|
|
|
if method == "tools/list":
|
|
return {
|
|
"jsonrpc": "2.0", "id": rid,
|
|
"result": {"tools": TOOLS},
|
|
}
|
|
|
|
if method == "tools/call":
|
|
name = req["params"]["name"]
|
|
args = req["params"].get("arguments", {})
|
|
try:
|
|
if name == "pg_query":
|
|
result = tool_query(args["sql"], args.get("params"), args.get("limit", 200))
|
|
elif name == "pg_execute":
|
|
result = tool_execute(args["sql"], args.get("params"))
|
|
elif name == "pg_list_tables":
|
|
result = tool_list_tables()
|
|
elif name == "pg_describe_table":
|
|
result = tool_describe_table(args["table"])
|
|
else:
|
|
result = f"Neznámý tool: {name}"
|
|
return {
|
|
"jsonrpc": "2.0", "id": rid,
|
|
"result": {"content": [{"type": "text", "text": result}]},
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0", "id": rid,
|
|
"result": {"content": [{"type": "text", "text": f"Chyba: {e}"}], "isError": True},
|
|
}
|
|
|
|
# Neznámá metoda
|
|
return {
|
|
"jsonrpc": "2.0", "id": rid,
|
|
"error": {"code": -32601, "message": f"Method not found: {method}"},
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Ověření připojení při startu
|
|
try:
|
|
with get_conn() as c:
|
|
pass
|
|
sys.stderr.write("postgres-fotky MCP: připojeno k DB OK\n")
|
|
except Exception as e:
|
|
sys.stderr.write(f"postgres-fotky MCP: VAROVÁNÍ — nelze se připojit k DB: {e}\n")
|
|
|
|
while True:
|
|
try:
|
|
req = recv()
|
|
except EOFError:
|
|
break
|
|
resp = handle(req)
|
|
if resp is not None:
|
|
send(resp)
|