Files
2026-05-31 06:24:37 +02:00

227 lines
7.5 KiB
Python

"""
PostgreSQL MCP Server pro fotky_buzalkovi
Spustit: python server.py
"""
import json
import sys
import os
import psycopg2
import psycopg2.extras
from typing import Any
# Vynutit UTF-8 na stdout/stderr (Windows může defaultovat na cp1252)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
# --- MCP protocol helpers (stdio transport) ---
def send(msg: dict):
line = json.dumps(msg, ensure_ascii=False, default=str)
sys.stdout.write(line + "\n")
sys.stdout.flush()
def recv() -> dict:
line = sys.stdin.readline()
if not line:
raise EOFError
return json.loads(line)
# --- DB connection ---
DB = dict(
host=os.getenv("DB_HOST", "192.168.1.76"),
port=int(os.getenv("DB_PORT", 5432)),
user=os.getenv("DB_USER", "vladimir.buzalka"),
password=os.getenv("DB_PASSWORD", "Vlado7309208104++"),
dbname=os.getenv("DB_NAME", "fotky_buzalkovi"),
)
def get_conn():
return psycopg2.connect(**DB)
# --- Tool implementations ---
def tool_query(sql: str, params: list | None = None, limit: int = 200) -> str:
"""SELECT dotaz, vrátí JSON."""
sql = sql.strip().rstrip(";")
# Bezpečnost: pouze SELECT/EXPLAIN/WITH
first = sql.split()[0].upper()
if first not in ("SELECT", "EXPLAIN", "WITH", "TABLE", "SHOW"):
return "Chyba: tool_query je jen pro SELECT. Pro zápis použij tool_execute."
sql = f"SELECT * FROM ({sql}) _q LIMIT {limit}" if first == "SELECT" else sql
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params or [])
rows = cur.fetchall()
return json.dumps([dict(r) for r in rows], ensure_ascii=False, default=str)
def tool_execute(sql: str, params: list | None = None) -> str:
"""INSERT/UPDATE/DELETE/CREATE — vrátí počet ovlivněných řádků."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(sql, params or [])
rowcount = cur.rowcount
conn.commit()
return f"OK, ovlivněno řádků: {rowcount}"
def tool_list_tables() -> str:
"""Vrátí seznam tabulek v public schema."""
sql = """
SELECT table_name,
pg_size_pretty(pg_total_relation_size(quote_ident(table_name))) AS size
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
"""
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql)
rows = cur.fetchall()
return json.dumps([dict(r) for r in rows], ensure_ascii=False)
def tool_describe_table(table: str) -> str:
"""Vrátí sloupce, typy, nullable, default + indexy dané tabulky."""
cols_sql = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
"""
idx_sql = """
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = 'public' AND tablename = %s
"""
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(cols_sql, [table])
cols = cur.fetchall()
cur.execute(idx_sql, [table])
idxs = cur.fetchall()
return json.dumps({
"columns": [dict(r) for r in cols],
"indexes": [dict(r) for r in idxs],
}, ensure_ascii=False)
# --- Tool registry ---
TOOLS = [
{
"name": "pg_query",
"description": "Spustí SELECT dotaz na fotky_buzalkovi a vrátí výsledky jako JSON. Max 200 řádků (nastavitelné parametrem limit).",
"inputSchema": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SELECT SQL dotaz"},
"params": {"type": "array", "description": "Poziční parametry (%s)", "default": []},
"limit": {"type": "integer","description": "Max počet řádků (default 200)", "default": 200},
},
"required": ["sql"],
},
},
{
"name": "pg_execute",
"description": "Spustí INSERT/UPDATE/DELETE/DDL na fotky_buzalkovi. Vrátí počet ovlivněných řádků.",
"inputSchema": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL příkaz (INSERT/UPDATE/DELETE/CREATE/ALTER)"},
"params": {"type": "array", "description": "Poziční parametry (%s)", "default": []},
},
"required": ["sql"],
},
},
{
"name": "pg_list_tables",
"description": "Vrátí seznam všech tabulek v databázi fotky_buzalkovi s jejich velikostí.",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "pg_describe_table",
"description": "Vrátí strukturu (sloupce, typy, indexy) dané tabulky.",
"inputSchema": {
"type": "object",
"properties": {
"table": {"type": "string", "description": "Název tabulky"},
},
"required": ["table"],
},
},
]
# --- MCP message loop ---
def handle(req: dict) -> dict | None:
method = req.get("method", "")
rid = req.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0", "id": rid,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "postgres-fotky", "version": "1.0.0"},
},
}
if method == "notifications/initialized":
return None # no response needed
if method == "tools/list":
return {
"jsonrpc": "2.0", "id": rid,
"result": {"tools": TOOLS},
}
if method == "tools/call":
name = req["params"]["name"]
args = req["params"].get("arguments", {})
try:
if name == "pg_query":
result = tool_query(args["sql"], args.get("params"), args.get("limit", 200))
elif name == "pg_execute":
result = tool_execute(args["sql"], args.get("params"))
elif name == "pg_list_tables":
result = tool_list_tables()
elif name == "pg_describe_table":
result = tool_describe_table(args["table"])
else:
result = f"Neznámý tool: {name}"
return {
"jsonrpc": "2.0", "id": rid,
"result": {"content": [{"type": "text", "text": result}]},
}
except Exception as e:
return {
"jsonrpc": "2.0", "id": rid,
"result": {"content": [{"type": "text", "text": f"Chyba: {e}"}], "isError": True},
}
# Neznámá metoda
return {
"jsonrpc": "2.0", "id": rid,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
if __name__ == "__main__":
# Ověření připojení při startu
try:
with get_conn() as c:
pass
sys.stderr.write("postgres-fotky MCP: připojeno k DB OK\n")
except Exception as e:
sys.stderr.write(f"postgres-fotky MCP: VAROVÁNÍ — nelze se připojit k DB: {e}\n")
while True:
try:
req = recv()
except EOFError:
break
resp = handle(req)
if resp is not None:
send(resp)