346 lines
12 KiB
Python
346 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
mcp_mysql_v1.0.py
|
|
MCP server pro MySQL — FastMCP + mysql-connector-python.
|
|
Verze: 1.0 | Datum: 2026-06-17
|
|
Popis: Read-only dotazy (query/describe/list_*) + bezpečné zápisy přes
|
|
preview_execute -> execute(confirmed=True). Vzor: mcp_postgres.py.
|
|
|
|
Spustit: python mcp_mysql_v1.0.py
|
|
|
|
Host: 192.168.1.76:3306
|
|
User: root
|
|
Default DB: žádná (None) — DB se předává tool argumentem `db`, nebo se
|
|
tabulky plně kvalifikují (db.tabulka). Dostupné DB mj.:
|
|
fio (transactions), studie, medicus, medevio, kanboard,
|
|
OrdinaceDropBoxBackup, puzzle, torrents.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, date, timedelta
|
|
from decimal import Decimal
|
|
from typing import Optional, Union
|
|
|
|
import mysql.connector
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
MY_HOST = "192.168.1.76"
|
|
MY_PORT = 3306
|
|
MY_USER = "root"
|
|
MY_PASSWORD = "Vlado9674+"
|
|
MY_DEFAULT_DB = None # None = připojení bez výchozí databáze
|
|
|
|
SYSTEM_DBS = ("information_schema", "mysql", "performance_schema", "sys")
|
|
|
|
WRITE_KEYWORDS = ("insert", "update", "delete", "drop", "truncate",
|
|
"alter", "create", "grant", "revoke", "rename", "replace")
|
|
|
|
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
_pool: dict[str, "mysql.connector.MySQLConnection"] = {}
|
|
|
|
|
|
def get_conn(db: Optional[str] = None):
|
|
"""Lazy connection cache keyed by db name. Reconnects if dropped."""
|
|
name = db or MY_DEFAULT_DB
|
|
key = name or ""
|
|
conn = _pool.get(key)
|
|
if conn is not None:
|
|
try:
|
|
conn.ping(reconnect=True, attempts=2, delay=1)
|
|
return conn
|
|
except Exception:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
_pool.pop(key, None)
|
|
kwargs = dict(
|
|
host=MY_HOST, port=MY_PORT, user=MY_USER, password=MY_PASSWORD,
|
|
connection_timeout=6, autocommit=True, charset="utf8mb4",
|
|
use_pure=True,
|
|
)
|
|
if name:
|
|
kwargs["database"] = name
|
|
conn = mysql.connector.connect(**kwargs)
|
|
_pool[key] = conn
|
|
return conn
|
|
|
|
|
|
def serialize(value):
|
|
if isinstance(value, (datetime, date)):
|
|
return value.isoformat()
|
|
if isinstance(value, timedelta):
|
|
return str(value)
|
|
if isinstance(value, Decimal):
|
|
return float(value)
|
|
if isinstance(value, (bytes, bytearray, memoryview)):
|
|
try:
|
|
return bytes(value).decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return repr(value)
|
|
if isinstance(value, dict):
|
|
return {k: serialize(v) for k, v in value.items()}
|
|
if isinstance(value, (list, tuple)):
|
|
return [serialize(v) for v in value]
|
|
return value
|
|
|
|
|
|
def serialize_rows(rows):
|
|
return [{k: serialize(v) for k, v in row.items()} for row in rows]
|
|
|
|
|
|
def is_write(sql: str) -> bool:
|
|
s = sql.lstrip().lower()
|
|
return any(s.startswith(kw) for kw in WRITE_KEYWORDS)
|
|
|
|
|
|
# Verify connection on startup
|
|
try:
|
|
c = get_conn()
|
|
cur = c.cursor()
|
|
cur.execute("SELECT VERSION()")
|
|
ver = cur.fetchone()[0]
|
|
cur.close()
|
|
log(f"Connected to MySQL ({MY_HOST}:{MY_PORT}) — {ver}")
|
|
except Exception as e:
|
|
log(f"MySQL connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
mcp = FastMCP("mysql")
|
|
|
|
|
|
@mcp.tool()
|
|
def ping() -> dict:
|
|
"""Check if MySQL is reachable. Returns server version, latency, host."""
|
|
try:
|
|
start = time.monotonic()
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT VERSION()")
|
|
version = cur.fetchone()[0]
|
|
cur.close()
|
|
latency_ms = round((time.monotonic() - start) * 1000, 1)
|
|
return {"status": "ok", "version": version, "latency_ms": latency_ms,
|
|
"host": MY_HOST, "port": MY_PORT}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e), "host": MY_HOST}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_databases(include_system: bool = False) -> dict:
|
|
"""List databases on the server. System schemas excluded unless include_system=True."""
|
|
try:
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute("SHOW DATABASES")
|
|
dbs = [r[0] for r in cur.fetchall()]
|
|
cur.close()
|
|
if not include_system:
|
|
dbs = [d for d in dbs if d not in SYSTEM_DBS]
|
|
return {"count": len(dbs), "databases": dbs}
|
|
except Exception:
|
|
log(f"list_databases error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def list_tables(db: str) -> dict:
|
|
"""List tables (and views) in a database, with row-count estimate and engine."""
|
|
try:
|
|
conn = get_conn(db)
|
|
cur = conn.cursor(dictionary=True)
|
|
cur.execute("""
|
|
SELECT table_name, table_type, engine, table_rows, table_comment
|
|
FROM information_schema.tables
|
|
WHERE table_schema = %s
|
|
ORDER BY table_name
|
|
""", (db,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return {"db": db, "count": len(rows), "tables": serialize_rows(rows)}
|
|
except Exception:
|
|
log(f"list_tables error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def describe_table(table: str, db: str) -> dict:
|
|
"""Return columns, types, nullability, keys, defaults, indexes and exact row count."""
|
|
try:
|
|
conn = get_conn(db)
|
|
cur = conn.cursor(dictionary=True)
|
|
cur.execute("""
|
|
SELECT column_name, column_type, is_nullable, column_key,
|
|
column_default, extra, character_maximum_length, column_comment
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s
|
|
ORDER BY ordinal_position
|
|
""", (db, table))
|
|
columns = cur.fetchall()
|
|
|
|
cur.execute("""
|
|
SELECT index_name, non_unique, seq_in_index, column_name, index_type
|
|
FROM information_schema.statistics
|
|
WHERE table_schema = %s AND table_name = %s
|
|
ORDER BY index_name, seq_in_index
|
|
""", (db, table))
|
|
indexes = cur.fetchall()
|
|
|
|
row_count = None
|
|
try:
|
|
cur.execute(f"SELECT COUNT(*) AS c FROM `{db}`.`{table}`")
|
|
row_count = cur.fetchone()["c"]
|
|
except Exception as e:
|
|
row_count = f"error: {e}"
|
|
cur.close()
|
|
|
|
return {"db": db, "table": table, "row_count": row_count,
|
|
"columns": serialize_rows(columns),
|
|
"indexes": serialize_rows(indexes)}
|
|
except Exception:
|
|
log(f"describe_table error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def query(
|
|
sql: str,
|
|
db: Optional[str] = None,
|
|
params_json: Optional[Union[str, list]] = None,
|
|
limit: int = 100,
|
|
) -> dict:
|
|
"""Run a READ-ONLY SQL query (SELECT / WITH / SHOW / EXPLAIN / DESCRIBE).
|
|
For writes use `execute` (which requires preview + confirmed=True).
|
|
db: optional default schema; tables can also be fully qualified (db.table).
|
|
params_json: JSON array of positional parameters for %s placeholders, e.g. '["6192081885"]'.
|
|
limit: cap on returned rows (max 1000). Python-side cutoff — the SQL is NOT modified.
|
|
"""
|
|
try:
|
|
if is_write(sql):
|
|
return {"status": "rejected",
|
|
"reason": "Write/DDL statement detected. Use `execute` (preview + confirmed=True)."}
|
|
limit = min(max(limit, 1), 1000)
|
|
params = json.loads(params_json) if isinstance(params_json, str) else (params_json or [])
|
|
conn = get_conn(db)
|
|
cur = conn.cursor(dictionary=True)
|
|
cur.execute(sql, params)
|
|
if cur.description is None:
|
|
rowcount = cur.rowcount
|
|
cur.close()
|
|
return {"status": "ok", "rowcount": rowcount, "rows": []}
|
|
rows = cur.fetchmany(limit)
|
|
cur.close()
|
|
return {"status": "ok", "db": db or MY_DEFAULT_DB,
|
|
"row_count": len(rows), "limit": limit,
|
|
"rows": serialize_rows(rows)}
|
|
except Exception as e:
|
|
log(f"query error: {traceback.format_exc()}")
|
|
return {"status": "error", "error": str(e), "sql": sql}
|
|
|
|
|
|
@mcp.tool()
|
|
def explain(sql: str, db: Optional[str] = None) -> dict:
|
|
"""Return the EXPLAIN plan (FORMAT=JSON) for a SELECT-style query. Does not run the query."""
|
|
try:
|
|
if is_write(sql):
|
|
return {"status": "rejected", "reason": "Refuse to EXPLAIN a write statement."}
|
|
conn = get_conn(db)
|
|
cur = conn.cursor()
|
|
cur.execute("EXPLAIN FORMAT=JSON " + sql)
|
|
plan = cur.fetchone()[0]
|
|
cur.close()
|
|
try:
|
|
plan = json.loads(plan)
|
|
except Exception:
|
|
pass
|
|
return {"status": "ok", "plan": plan}
|
|
except Exception as e:
|
|
log(f"explain error: {traceback.format_exc()}")
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def preview_execute(
|
|
sql: str,
|
|
db: Optional[str] = None,
|
|
params_json: Optional[Union[str, list]] = None,
|
|
) -> dict:
|
|
"""Preview a write/DDL statement inside a transaction, then ROLLBACK. Reports rows that WOULD be affected.
|
|
Always call this first and present the result to the user before calling `execute` with confirmed=True.
|
|
Note: DDL (CREATE/ALTER/DROP/TRUNCATE) auto-commits in MySQL and cannot be rolled back — those are
|
|
reported as not-previewable; treat with extra care.
|
|
"""
|
|
try:
|
|
params = json.loads(params_json) if isinstance(params_json, str) else (params_json or [])
|
|
s = sql.lstrip().lower()
|
|
ddl = any(s.startswith(kw) for kw in ("create", "alter", "drop", "truncate", "rename", "grant", "revoke"))
|
|
kwargs = dict(host=MY_HOST, port=MY_PORT, user=MY_USER, password=MY_PASSWORD,
|
|
connection_timeout=6, autocommit=False, charset="utf8mb4", use_pure=True)
|
|
if db:
|
|
kwargs["database"] = db
|
|
conn = mysql.connector.connect(**kwargs)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(sql, params)
|
|
rowcount = cur.rowcount
|
|
cur.close()
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
return {
|
|
"status": "preview",
|
|
"db": db or MY_DEFAULT_DB,
|
|
"sql": sql,
|
|
"params": params,
|
|
"would_affect_rows": rowcount,
|
|
"ddl_autocommits": ddl,
|
|
"note": ("DDL in MySQL commits implicitly — rollback may not undo it; double-check before execute."
|
|
if ddl else
|
|
"Statement ran inside a transaction and was rolled back. Get explicit user approval, then call `execute` with confirmed=True."),
|
|
}
|
|
except Exception as e:
|
|
log(f"preview_execute error: {traceback.format_exc()}")
|
|
return {"status": "error", "error": str(e), "sql": sql}
|
|
|
|
|
|
@mcp.tool()
|
|
def execute(
|
|
sql: str,
|
|
db: Optional[str] = None,
|
|
params_json: Optional[Union[str, list]] = None,
|
|
confirmed: bool = False,
|
|
) -> dict:
|
|
"""Run a write/DDL statement (INSERT/UPDATE/DELETE/CREATE/ALTER/DROP/...).
|
|
REQUIRES confirmed=True — first call `preview_execute` and obtain explicit user approval.
|
|
"""
|
|
if not confirmed:
|
|
return {"status": "aborted",
|
|
"reason": "confirmed=False. Call preview_execute first, present the impact, then re-call with confirmed=True."}
|
|
try:
|
|
params = json.loads(params_json) if isinstance(params_json, str) else (params_json or [])
|
|
conn = get_conn(db)
|
|
cur = conn.cursor()
|
|
cur.execute(sql, params)
|
|
rowcount = cur.rowcount
|
|
lastrowid = cur.lastrowid
|
|
cur.close()
|
|
log(f"execute: db={db or MY_DEFAULT_DB} rowcount={rowcount}")
|
|
return {"status": "ok", "db": db or MY_DEFAULT_DB,
|
|
"rowcount": rowcount, "lastrowid": lastrowid, "sql": sql}
|
|
except Exception as e:
|
|
log(f"execute error: {traceback.format_exc()}")
|
|
return {"status": "error", "error": str(e), "sql": sql}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log("MCP MySQL server started (FastMCP)")
|
|
mcp.run()
|