155 lines
4.3 KiB
Python
155 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP)
|
|
Spustit: python mcp_firebird.py
|
|
"""
|
|
|
|
import sys
|
|
import fdb
|
|
import traceback
|
|
from typing import Optional
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
FB_CONFIG = {
|
|
'dsn': r'localhost:c:\medicus 3\data\medicus.fdb',
|
|
'user': 'SYSDBA',
|
|
'password': 'masterkey',
|
|
'charset': 'win1250',
|
|
'port': 3070,
|
|
}
|
|
|
|
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
# Připojení k Firebirdu
|
|
try:
|
|
conn = fdb.connect(**FB_CONFIG)
|
|
log("✓ Připojeno k Firebirdu (Medicus)")
|
|
except Exception as e:
|
|
log(f"✗ Chyba připojení k Firebirdu: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def rows_to_json(rows, description):
|
|
"""Převede fdb rows na JSON-serializovatelný formát"""
|
|
import datetime
|
|
import decimal
|
|
|
|
def convert(val):
|
|
if isinstance(val, (datetime.date, datetime.datetime)):
|
|
return val.isoformat()
|
|
if isinstance(val, decimal.Decimal):
|
|
return float(val)
|
|
if isinstance(val, bytes):
|
|
return val.decode('win1250', errors='replace')
|
|
return val
|
|
|
|
cols = [d[0].strip() for d in description]
|
|
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
|
|
|
|
|
|
# MCP server
|
|
mcp = FastMCP("medicus-firebird")
|
|
|
|
|
|
@mcp.tool()
|
|
def execute_query(sql: str, params: Optional[list] = None) -> dict:
|
|
"""Spusť SQL dotaz na Medicus databázi.
|
|
Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
if params:
|
|
cur.execute(sql, params)
|
|
else:
|
|
cur.execute(sql)
|
|
|
|
if sql.strip().upper().startswith('SELECT'):
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
return {
|
|
'rowcount': len(rows),
|
|
'rows': rows
|
|
}
|
|
else:
|
|
conn.commit()
|
|
return {
|
|
'rowcount': cur.rowcount,
|
|
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
|
|
}
|
|
except Exception as e:
|
|
log(f"execute_query chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def list_tables() -> list[str]:
|
|
"""Vrátí seznam všech uživatelských tabulek v Medicus databázi."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT TRIM(RDB$RELATION_NAME)
|
|
FROM RDB$RELATIONS
|
|
WHERE RDB$SYSTEM_FLAG = 0
|
|
ORDER BY RDB$RELATION_NAME
|
|
""")
|
|
return [row[0] for row in cur.fetchall()]
|
|
except Exception as e:
|
|
log(f"list_tables chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_table_columns(table_name: str) -> list[str]:
|
|
"""Vrátí seznam sloupců dané tabulky."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT TRIM(RDB$FIELD_NAME)
|
|
FROM RDB$RELATION_FIELDS
|
|
WHERE TRIM(RDB$RELATION_NAME) = ?
|
|
ORDER BY RDB$FIELD_POSITION
|
|
""", [table_name.upper()])
|
|
return [row[0] for row in cur.fetchall()]
|
|
except Exception as e:
|
|
log(f"get_table_columns chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_schema() -> dict:
|
|
"""Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME
|
|
FROM RDB$RELATIONS r
|
|
LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME
|
|
WHERE r.RDB$SYSTEM_FLAG = 0
|
|
ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION
|
|
""")
|
|
|
|
schema = {}
|
|
for tbl_name, col_name in cur.fetchall():
|
|
tbl = tbl_name.strip() if tbl_name else 'unknown'
|
|
col = col_name.strip() if col_name else ''
|
|
if tbl not in schema:
|
|
schema[tbl] = []
|
|
if col:
|
|
schema[tbl].append(col)
|
|
|
|
return {
|
|
'table_count': len(schema),
|
|
'tables': list(schema.keys()),
|
|
'schema': schema
|
|
}
|
|
except Exception as e:
|
|
log(f"get_schema chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log("MCP Firebird server spuštěn (FastMCP)")
|
|
mcp.run()
|