#!/usr/bin/env python3 """ mcp_mysql_v1.0.py MCP server pro MySQL — FastMCP + mysql-connector-python. Verze: 1.0 | Datum: 2026-06-17 Popis: Read-only dotazy (query/describe/list_*) + bezpečné zápisy přes preview_execute -> execute(confirmed=True). Vzor: mcp_postgres.py. Spustit: python mcp_mysql_v1.0.py Host: 192.168.1.76:3306 User: root Default DB: žádná (None) — DB se předává tool argumentem `db`, nebo se tabulky plně kvalifikují (db.tabulka). Dostupné DB mj.: fio (transactions), studie, medicus, medevio, kanboard, OrdinaceDropBoxBackup, puzzle, torrents. """ import json import sys import time import traceback from datetime import datetime, date, timedelta from decimal import Decimal from typing import Optional, Union import mysql.connector from mcp.server.fastmcp import FastMCP MY_HOST = "192.168.1.76" MY_PORT = 3306 MY_USER = "root" MY_PASSWORD = "Vlado9674+" MY_DEFAULT_DB = None # None = připojení bez výchozí databáze SYSTEM_DBS = ("information_schema", "mysql", "performance_schema", "sys") WRITE_KEYWORDS = ("insert", "update", "delete", "drop", "truncate", "alter", "create", "grant", "revoke", "rename", "replace") def log(msg: str): print(msg, file=sys.stderr, flush=True) _pool: dict[str, "mysql.connector.MySQLConnection"] = {} def get_conn(db: Optional[str] = None): """Lazy connection cache keyed by db name. Reconnects if dropped.""" name = db or MY_DEFAULT_DB key = name or "" conn = _pool.get(key) if conn is not None: try: conn.ping(reconnect=True, attempts=2, delay=1) return conn except Exception: try: conn.close() except Exception: pass _pool.pop(key, None) kwargs = dict( host=MY_HOST, port=MY_PORT, user=MY_USER, password=MY_PASSWORD, connection_timeout=6, autocommit=True, charset="utf8mb4", use_pure=True, ) if name: kwargs["database"] = name conn = mysql.connector.connect(**kwargs) _pool[key] = conn return conn def serialize(value): if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, timedelta): return str(value) if isinstance(value, Decimal): return float(value) if isinstance(value, (bytes, bytearray, memoryview)): try: return bytes(value).decode("utf-8", errors="replace") except Exception: return repr(value) if isinstance(value, dict): return {k: serialize(v) for k, v in value.items()} if isinstance(value, (list, tuple)): return [serialize(v) for v in value] return value def serialize_rows(rows): return [{k: serialize(v) for k, v in row.items()} for row in rows] def is_write(sql: str) -> bool: s = sql.lstrip().lower() return any(s.startswith(kw) for kw in WRITE_KEYWORDS) # Verify connection on startup try: c = get_conn() cur = c.cursor() cur.execute("SELECT VERSION()") ver = cur.fetchone()[0] cur.close() log(f"Connected to MySQL ({MY_HOST}:{MY_PORT}) — {ver}") except Exception as e: log(f"MySQL connection failed: {e}") sys.exit(1) mcp = FastMCP("mysql") @mcp.tool() def ping() -> dict: """Check if MySQL is reachable. Returns server version, latency, host.""" try: start = time.monotonic() conn = get_conn() cur = conn.cursor() cur.execute("SELECT VERSION()") version = cur.fetchone()[0] cur.close() latency_ms = round((time.monotonic() - start) * 1000, 1) return {"status": "ok", "version": version, "latency_ms": latency_ms, "host": MY_HOST, "port": MY_PORT} except Exception as e: return {"status": "error", "error": str(e), "host": MY_HOST} @mcp.tool() def list_databases(include_system: bool = False) -> dict: """List databases on the server. System schemas excluded unless include_system=True.""" try: conn = get_conn() cur = conn.cursor() cur.execute("SHOW DATABASES") dbs = [r[0] for r in cur.fetchall()] cur.close() if not include_system: dbs = [d for d in dbs if d not in SYSTEM_DBS] return {"count": len(dbs), "databases": dbs} except Exception: log(f"list_databases error: {traceback.format_exc()}") raise @mcp.tool() def list_tables(db: str) -> dict: """List tables (and views) in a database, with row-count estimate and engine.""" try: conn = get_conn(db) cur = conn.cursor(dictionary=True) cur.execute(""" SELECT table_name, table_type, engine, table_rows, table_comment FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name """, (db,)) rows = cur.fetchall() cur.close() return {"db": db, "count": len(rows), "tables": serialize_rows(rows)} except Exception: log(f"list_tables error: {traceback.format_exc()}") raise @mcp.tool() def describe_table(table: str, db: str) -> dict: """Return columns, types, nullability, keys, defaults, indexes and exact row count.""" try: conn = get_conn(db) cur = conn.cursor(dictionary=True) cur.execute(""" SELECT column_name, column_type, is_nullable, column_key, column_default, extra, character_maximum_length, column_comment FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (db, table)) columns = cur.fetchall() cur.execute(""" SELECT index_name, non_unique, seq_in_index, column_name, index_type FROM information_schema.statistics WHERE table_schema = %s AND table_name = %s ORDER BY index_name, seq_in_index """, (db, table)) indexes = cur.fetchall() row_count = None try: cur.execute(f"SELECT COUNT(*) AS c FROM `{db}`.`{table}`") row_count = cur.fetchone()["c"] except Exception as e: row_count = f"error: {e}" cur.close() return {"db": db, "table": table, "row_count": row_count, "columns": serialize_rows(columns), "indexes": serialize_rows(indexes)} except Exception: log(f"describe_table error: {traceback.format_exc()}") raise @mcp.tool() def query( sql: str, db: Optional[str] = None, params_json: Optional[Union[str, list]] = None, limit: int = 100, ) -> dict: """Run a READ-ONLY SQL query (SELECT / WITH / SHOW / EXPLAIN / DESCRIBE). For writes use `execute` (which requires preview + confirmed=True). db: optional default schema; tables can also be fully qualified (db.table). params_json: JSON array of positional parameters for %s placeholders, e.g. '["6192081885"]'. limit: cap on returned rows (max 1000). Python-side cutoff — the SQL is NOT modified. """ try: if is_write(sql): return {"status": "rejected", "reason": "Write/DDL statement detected. Use `execute` (preview + confirmed=True)."} limit = min(max(limit, 1), 1000) params = json.loads(params_json) if isinstance(params_json, str) else (params_json or []) conn = get_conn(db) cur = conn.cursor(dictionary=True) cur.execute(sql, params) if cur.description is None: rowcount = cur.rowcount cur.close() return {"status": "ok", "rowcount": rowcount, "rows": []} rows = cur.fetchmany(limit) cur.close() return {"status": "ok", "db": db or MY_DEFAULT_DB, "row_count": len(rows), "limit": limit, "rows": serialize_rows(rows)} except Exception as e: log(f"query error: {traceback.format_exc()}") return {"status": "error", "error": str(e), "sql": sql} @mcp.tool() def explain(sql: str, db: Optional[str] = None) -> dict: """Return the EXPLAIN plan (FORMAT=JSON) for a SELECT-style query. Does not run the query.""" try: if is_write(sql): return {"status": "rejected", "reason": "Refuse to EXPLAIN a write statement."} conn = get_conn(db) cur = conn.cursor() cur.execute("EXPLAIN FORMAT=JSON " + sql) plan = cur.fetchone()[0] cur.close() try: plan = json.loads(plan) except Exception: pass return {"status": "ok", "plan": plan} except Exception as e: log(f"explain error: {traceback.format_exc()}") return {"status": "error", "error": str(e)} @mcp.tool() def preview_execute( sql: str, db: Optional[str] = None, params_json: Optional[Union[str, list]] = None, ) -> dict: """Preview a write/DDL statement inside a transaction, then ROLLBACK. Reports rows that WOULD be affected. Always call this first and present the result to the user before calling `execute` with confirmed=True. Note: DDL (CREATE/ALTER/DROP/TRUNCATE) auto-commits in MySQL and cannot be rolled back — those are reported as not-previewable; treat with extra care. """ try: params = json.loads(params_json) if isinstance(params_json, str) else (params_json or []) s = sql.lstrip().lower() ddl = any(s.startswith(kw) for kw in ("create", "alter", "drop", "truncate", "rename", "grant", "revoke")) kwargs = dict(host=MY_HOST, port=MY_PORT, user=MY_USER, password=MY_PASSWORD, connection_timeout=6, autocommit=False, charset="utf8mb4", use_pure=True) if db: kwargs["database"] = db conn = mysql.connector.connect(**kwargs) try: cur = conn.cursor() cur.execute(sql, params) rowcount = cur.rowcount cur.close() conn.rollback() finally: conn.close() return { "status": "preview", "db": db or MY_DEFAULT_DB, "sql": sql, "params": params, "would_affect_rows": rowcount, "ddl_autocommits": ddl, "note": ("DDL in MySQL commits implicitly — rollback may not undo it; double-check before execute." if ddl else "Statement ran inside a transaction and was rolled back. Get explicit user approval, then call `execute` with confirmed=True."), } except Exception as e: log(f"preview_execute error: {traceback.format_exc()}") return {"status": "error", "error": str(e), "sql": sql} @mcp.tool() def execute( sql: str, db: Optional[str] = None, params_json: Optional[Union[str, list]] = None, confirmed: bool = False, ) -> dict: """Run a write/DDL statement (INSERT/UPDATE/DELETE/CREATE/ALTER/DROP/...). REQUIRES confirmed=True — first call `preview_execute` and obtain explicit user approval. """ if not confirmed: return {"status": "aborted", "reason": "confirmed=False. Call preview_execute first, present the impact, then re-call with confirmed=True."} try: params = json.loads(params_json) if isinstance(params_json, str) else (params_json or []) conn = get_conn(db) cur = conn.cursor() cur.execute(sql, params) rowcount = cur.rowcount lastrowid = cur.lastrowid cur.close() log(f"execute: db={db or MY_DEFAULT_DB} rowcount={rowcount}") return {"status": "ok", "db": db or MY_DEFAULT_DB, "rowcount": rowcount, "lastrowid": lastrowid, "sql": sql} except Exception as e: log(f"execute error: {traceback.format_exc()}") return {"status": "error", "error": str(e), "sql": sql} if __name__ == "__main__": log("MCP MySQL server started (FastMCP)") mcp.run()