425 lines
14 KiB
Python
425 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP)
|
|
Spustit: python mcp_firebird.py
|
|
"""
|
|
|
|
import sys
|
|
import fdb
|
|
import traceback
|
|
from typing import Optional
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
FB_CONFIG = {
|
|
'dsn': r'reporter:c:\medicus\medicus.fdb',
|
|
'user': 'SYSDBA',
|
|
'password': 'masterkey',
|
|
'charset': 'win1250',
|
|
}
|
|
|
|
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
# Připojení k Firebirdu
|
|
try:
|
|
conn = fdb.connect(**FB_CONFIG)
|
|
log("✓ Připojeno k Firebirdu (Medicus)")
|
|
except Exception as e:
|
|
log(f"✗ Chyba připojení k Firebirdu: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def rows_to_json(rows, description):
|
|
"""Převede fdb rows na JSON-serializovatelný formát"""
|
|
import datetime
|
|
import decimal
|
|
|
|
def convert(val):
|
|
if isinstance(val, (datetime.date, datetime.datetime)):
|
|
return val.isoformat()
|
|
if isinstance(val, decimal.Decimal):
|
|
return float(val)
|
|
if isinstance(val, bytes):
|
|
return val.decode('win1250', errors='replace')
|
|
return val
|
|
|
|
cols = [d[0].strip() for d in description]
|
|
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
|
|
|
|
|
|
# MCP server
|
|
mcp = FastMCP("medicus-firebird")
|
|
|
|
|
|
@mcp.tool()
|
|
def execute_query(sql: str, params: Optional[list] = None) -> dict:
|
|
"""Spusť SQL dotaz na Medicus databázi.
|
|
Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
if params:
|
|
cur.execute(sql, params)
|
|
else:
|
|
cur.execute(sql)
|
|
|
|
if sql.strip().upper().startswith('SELECT'):
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
return {
|
|
'rowcount': len(rows),
|
|
'rows': rows
|
|
}
|
|
else:
|
|
conn.commit()
|
|
return {
|
|
'rowcount': cur.rowcount,
|
|
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
|
|
}
|
|
except Exception as e:
|
|
log(f"execute_query chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def list_tables() -> list[str]:
|
|
"""Vrátí seznam všech uživatelských tabulek v Medicus databázi."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT TRIM(RDB$RELATION_NAME)
|
|
FROM RDB$RELATIONS
|
|
WHERE RDB$SYSTEM_FLAG = 0
|
|
ORDER BY RDB$RELATION_NAME
|
|
""")
|
|
return [row[0] for row in cur.fetchall()]
|
|
except Exception as e:
|
|
log(f"list_tables chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_table_columns(table_name: str) -> list[str]:
|
|
"""Vrátí seznam sloupců dané tabulky."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT TRIM(RDB$FIELD_NAME)
|
|
FROM RDB$RELATION_FIELDS
|
|
WHERE TRIM(RDB$RELATION_NAME) = ?
|
|
ORDER BY RDB$FIELD_POSITION
|
|
""", [table_name.upper()])
|
|
return [row[0] for row in cur.fetchall()]
|
|
except Exception as e:
|
|
log(f"get_table_columns chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_schema() -> dict:
|
|
"""Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME
|
|
FROM RDB$RELATIONS r
|
|
LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME
|
|
WHERE r.RDB$SYSTEM_FLAG = 0
|
|
ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION
|
|
""")
|
|
|
|
schema = {}
|
|
for tbl_name, col_name in cur.fetchall():
|
|
tbl = tbl_name.strip() if tbl_name else 'unknown'
|
|
col = col_name.strip() if col_name else ''
|
|
if tbl not in schema:
|
|
schema[tbl] = []
|
|
if col:
|
|
schema[tbl].append(col)
|
|
|
|
return {
|
|
'table_count': len(schema),
|
|
'tables': list(schema.keys()),
|
|
'schema': schema
|
|
}
|
|
except Exception as e:
|
|
log(f"get_schema chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
|
|
# ── Velké tabulky kde je nutný WHERE filtr ──────────────────────────────────
|
|
LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'}
|
|
|
|
|
|
def _parse_histdoc_blob(data_text: str) -> dict:
|
|
"""Interní parser key=value blobu z HISTDOC.DATA."""
|
|
result = {}
|
|
if not data_text:
|
|
return result
|
|
for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'):
|
|
line = line.strip()
|
|
if '=' not in line:
|
|
continue
|
|
key, _, value = line.partition('=')
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if value.startswith('C:'):
|
|
try:
|
|
result[key] = float(value[2:].replace(',', '.'))
|
|
except Exception:
|
|
result[key] = value[2:]
|
|
elif value.startswith('I:'):
|
|
try:
|
|
result[key] = int(value[2:])
|
|
except Exception:
|
|
result[key] = value[2:]
|
|
elif value.startswith('D:'):
|
|
result[key] = value[2:]
|
|
elif value in ('$:~\b', '$:~\x08', '$:'):
|
|
result[key] = ''
|
|
else:
|
|
result[key] = value
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
def get_patient(idpac: int) -> dict:
|
|
"""Vrátí základní info o pacientovi z tabulky KAR podle IDPAC.
|
|
Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen.
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN
|
|
FROM KAR WHERE IDPAC = ?
|
|
""", [idpac])
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
|
import datetime, decimal
|
|
def cv(v):
|
|
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
|
if isinstance(v, decimal.Decimal): return float(v)
|
|
return v
|
|
cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen']
|
|
return dict(zip(cols, [cv(v) for v in row]))
|
|
except Exception:
|
|
log(f"get_patient chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def search_patients(query: str) -> list:
|
|
"""Vyhledá pacienty podle příjmení, jména nebo rodného čísla (částečná shoda).
|
|
Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, pojistovna.
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
q = query.strip().upper()
|
|
cur.execute("""
|
|
SELECT FIRST 50 IDPAC, JMENO, PRIJMENI, RODCIS, POJ
|
|
FROM KAR
|
|
WHERE UPPER(PRIJMENI) LIKE ? OR UPPER(JMENO) LIKE ? OR RODCIS LIKE ?
|
|
ORDER BY PRIJMENI, JMENO
|
|
""", [f'%{q}%', f'%{q}%', f'%{q}%'])
|
|
rows = cur.fetchall()
|
|
return [
|
|
{'idpac': r[0], 'jmeno': r[1], 'prijmeni': r[2], 'rc': r[3], 'pojistovna': r[4]}
|
|
for r in rows
|
|
]
|
|
except Exception:
|
|
log(f"search_patients chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict:
|
|
"""Chronologický přehled všech záznamů pacienta z DOCLIST.
|
|
datum_od / datum_do ve formátu YYYY-MM-DD (volitelné).
|
|
Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis).
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
# Info o pacientovi
|
|
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
|
pac = cur.fetchone()
|
|
if not pac:
|
|
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
|
|
|
# Timeline z DOCLIST
|
|
sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?"
|
|
params = [idpac]
|
|
if datum_od:
|
|
sql += " AND DATUM >= ?"
|
|
params.append(datum_od)
|
|
if datum_do:
|
|
sql += " AND DATUM <= ?"
|
|
params.append(datum_do)
|
|
sql += " ORDER BY DATUM, ID"
|
|
|
|
cur.execute(sql, params)
|
|
import datetime, decimal
|
|
def cv(v):
|
|
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
|
return v
|
|
|
|
events = [
|
|
{'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''}
|
|
for r in cur.fetchall()
|
|
]
|
|
return {
|
|
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
|
'pocet': len(events),
|
|
'events': events
|
|
}
|
|
except Exception:
|
|
log(f"get_patient_timeline chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def parse_histdoc_data(idhistdoc: int) -> dict:
|
|
"""Načte a dekóduje DATA blob z HISTDOC pro daný záznam.
|
|
Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad…
|
|
"""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)),
|
|
e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA
|
|
FROM HISTDOC h
|
|
LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID
|
|
WHERE h.ID = ?
|
|
""", [idhistdoc])
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return {'error': f'HISTDOC ID={idhistdoc} nenalezen'}
|
|
import datetime
|
|
def cv(v):
|
|
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
|
return v
|
|
typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row
|
|
parsed = _parse_histdoc_blob(data_text or '')
|
|
return {
|
|
'id': idhistdoc,
|
|
'typ': typ,
|
|
'datum': cv(datum),
|
|
'data': parsed,
|
|
'epoukaz': {
|
|
'id_dokladu': id_dokladu,
|
|
'id_zp': id_zp,
|
|
'odeslano': cv(odeslano),
|
|
'vydano': cv(vydano),
|
|
'chyba': chyba == 'T' if chyba else False,
|
|
}
|
|
}
|
|
except Exception:
|
|
log(f"parse_histdoc_data chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_table_info(table_name: str) -> dict:
|
|
"""Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT
|
|
TRIM(f.RDB$FIELD_NAME) AS SLOUPEC,
|
|
CASE ft.RDB$FIELD_TYPE
|
|
WHEN 7 THEN 'SMALLINT'
|
|
WHEN 8 THEN 'INTEGER'
|
|
WHEN 10 THEN 'FLOAT'
|
|
WHEN 12 THEN 'DATE'
|
|
WHEN 13 THEN 'TIME'
|
|
WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')'
|
|
WHEN 16 THEN 'BIGINT'
|
|
WHEN 27 THEN 'DOUBLE'
|
|
WHEN 35 THEN 'TIMESTAMP'
|
|
WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')'
|
|
WHEN 261 THEN 'BLOB'
|
|
ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')'
|
|
END AS TYP,
|
|
CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE,
|
|
CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK
|
|
FROM RDB$RELATION_FIELDS f
|
|
JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE
|
|
LEFT JOIN (
|
|
SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME
|
|
FROM RDB$INDICES i
|
|
JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME
|
|
WHERE i.RDB$RELATION_NAME = ?
|
|
AND EXISTS (
|
|
SELECT 1 FROM RDB$RELATION_CONSTRAINTS c
|
|
WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME
|
|
AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY'
|
|
)
|
|
) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME
|
|
WHERE TRIM(f.RDB$RELATION_NAME) = ?
|
|
ORDER BY f.RDB$FIELD_POSITION
|
|
""", [table_name.upper(), table_name.upper()])
|
|
columns = [
|
|
{'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]}
|
|
for r in cur.fetchall()
|
|
]
|
|
# Počet záznamů
|
|
try:
|
|
cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}')
|
|
count = cur.fetchone()[0]
|
|
except Exception:
|
|
count = None
|
|
return {
|
|
'tabulka': table_name.upper(),
|
|
'pocet_zaznamu': count,
|
|
'sloupce': columns
|
|
}
|
|
except Exception:
|
|
log(f"get_table_info chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def safe_query(sql: str, params: Optional[list] = None) -> dict:
|
|
"""Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách.
|
|
Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY
|
|
bez WHERE klauzule. Výsledek omezen na 500 řádků.
|
|
"""
|
|
try:
|
|
sql_upper = sql.strip().upper()
|
|
if not sql_upper.startswith('SELECT'):
|
|
return {'error': 'safe_query podporuje pouze SELECT dotazy'}
|
|
|
|
# Varování pro velké tabulky bez WHERE
|
|
warnings = []
|
|
for tbl in LARGE_TABLES:
|
|
if tbl in sql_upper and 'WHERE' not in sql_upper:
|
|
warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!')
|
|
|
|
# Přidej FIRST limit pokud chybí
|
|
if 'FIRST ' not in sql_upper:
|
|
sql = sql.strip()
|
|
if sql.upper().startswith('SELECT'):
|
|
sql = 'SELECT FIRST 500 ' + sql[6:].lstrip()
|
|
|
|
cur = conn.cursor()
|
|
if params:
|
|
cur.execute(sql, params)
|
|
else:
|
|
cur.execute(sql)
|
|
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
result = {'rowcount': len(rows), 'rows': rows}
|
|
if warnings:
|
|
result['warnings'] = warnings
|
|
return result
|
|
except Exception:
|
|
log(f"safe_query chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log("MCP Firebird server spuštěn (FastMCP)")
|
|
mcp.run()
|