#!/usr/bin/env python3 """ MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP) Spustit: python mcp_firebird.py """ import sys import fdb import traceback from typing import Optional from mcp.server.fastmcp import FastMCP FB_CONFIG = { 'dsn': r'reporter:c:\medicus\medicus.fdb', 'user': 'SYSDBA', 'password': 'masterkey', 'charset': 'win1250', } # Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC def log(msg: str): print(msg, file=sys.stderr, flush=True) # Připojení k Firebirdu try: conn = fdb.connect(**FB_CONFIG) log("✓ Připojeno k Firebirdu (Medicus)") except Exception as e: log(f"✗ Chyba připojení k Firebirdu: {e}") sys.exit(1) def rows_to_json(rows, description): """Převede fdb rows na JSON-serializovatelný formát""" import datetime import decimal def convert(val): if isinstance(val, (datetime.date, datetime.datetime)): return val.isoformat() if isinstance(val, decimal.Decimal): return float(val) if isinstance(val, bytes): return val.decode('win1250', errors='replace') return val cols = [d[0].strip() for d in description] return [dict(zip(cols, [convert(v) for v in row])) for row in rows] # MCP server mcp = FastMCP("medicus-firebird") @mcp.tool() def execute_query(sql: str, params: Optional[list] = None) -> dict: """Spusť SQL dotaz na Medicus databázi. Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount. """ try: cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) if sql.strip().upper().startswith('SELECT'): rows = rows_to_json(cur.fetchall(), cur.description or []) return { 'rowcount': len(rows), 'rows': rows } else: conn.commit() return { 'rowcount': cur.rowcount, 'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno' } except Exception as e: log(f"execute_query chyba: {traceback.format_exc()}") raise @mcp.tool() def list_tables() -> list[str]: """Vrátí seznam všech uživatelských tabulek v Medicus databázi.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$RELATION_NAME) FROM RDB$RELATIONS WHERE RDB$SYSTEM_FLAG = 0 ORDER BY RDB$RELATION_NAME """) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"list_tables chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_columns(table_name: str) -> list[str]: """Vrátí seznam sloupců dané tabulky.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$FIELD_NAME) FROM RDB$RELATION_FIELDS WHERE TRIM(RDB$RELATION_NAME) = ? ORDER BY RDB$FIELD_POSITION """, [table_name.upper()]) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"get_table_columns chyba: {traceback.format_exc()}") raise @mcp.tool() def get_schema() -> dict: """Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce.""" try: cur = conn.cursor() cur.execute(""" SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME FROM RDB$RELATIONS r LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME WHERE r.RDB$SYSTEM_FLAG = 0 ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION """) schema = {} for tbl_name, col_name in cur.fetchall(): tbl = tbl_name.strip() if tbl_name else 'unknown' col = col_name.strip() if col_name else '' if tbl not in schema: schema[tbl] = [] if col: schema[tbl].append(col) return { 'table_count': len(schema), 'tables': list(schema.keys()), 'schema': schema } except Exception as e: log(f"get_schema chyba: {traceback.format_exc()}") raise # ── Velké tabulky kde je nutný WHERE filtr ────────────────────────────────── LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'} def _parse_histdoc_blob(data_text: str) -> dict: """Interní parser key=value blobu z HISTDOC.DATA.""" result = {} if not data_text: return result for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'): line = line.strip() if '=' not in line: continue key, _, value = line.partition('=') key = key.strip() value = value.strip() if value.startswith('C:'): try: result[key] = float(value[2:].replace(',', '.')) except Exception: result[key] = value[2:] elif value.startswith('I:'): try: result[key] = int(value[2:]) except Exception: result[key] = value[2:] elif value.startswith('D:'): result[key] = value[2:] elif value in ('$:~\b', '$:~\x08', '$:'): result[key] = '' else: result[key] = value return result @mcp.tool() def get_patient(idpac: int) -> dict: """Vrátí základní info o pacientovi z tabulky KAR podle IDPAC. Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen. """ try: cur = conn.cursor() cur.execute(""" SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN FROM KAR WHERE IDPAC = ? """, [idpac]) row = cur.fetchone() if not row: return {'error': f'Pacient IDPAC={idpac} nenalezen'} import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() if isinstance(v, decimal.Decimal): return float(v) return v cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen'] return dict(zip(cols, [cv(v) for v in row])) except Exception: log(f"get_patient chyba: {traceback.format_exc()}") raise @mcp.tool() def search_patients(query: str) -> list: """Vyhledá pacienty podle příjmení, jména nebo rodného čísla (částečná shoda). Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, pojistovna. """ try: cur = conn.cursor() q = query.strip().upper() cur.execute(""" SELECT FIRST 50 IDPAC, JMENO, PRIJMENI, RODCIS, POJ FROM KAR WHERE UPPER(PRIJMENI) LIKE ? OR UPPER(JMENO) LIKE ? OR RODCIS LIKE ? ORDER BY PRIJMENI, JMENO """, [f'%{q}%', f'%{q}%', f'%{q}%']) rows = cur.fetchall() return [ {'idpac': r[0], 'jmeno': r[1], 'prijmeni': r[2], 'rc': r[3], 'pojistovna': r[4]} for r in rows ] except Exception: log(f"search_patients chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict: """Chronologický přehled všech záznamů pacienta z DOCLIST. datum_od / datum_do ve formátu YYYY-MM-DD (volitelné). Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis). """ try: cur = conn.cursor() # Info o pacientovi cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} # Timeline z DOCLIST sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?" params = [idpac] if datum_od: sql += " AND DATUM >= ?" params.append(datum_od) if datum_do: sql += " AND DATUM <= ?" params.append(datum_do) sql += " ORDER BY DATUM, ID" cur.execute(sql, params) import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v events = [ {'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''} for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(events), 'events': events } except Exception: log(f"get_patient_timeline chyba: {traceback.format_exc()}") raise @mcp.tool() def parse_histdoc_data(idhistdoc: int) -> dict: """Načte a dekóduje DATA blob z HISTDOC pro daný záznam. Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad… """ try: cur = conn.cursor() cur.execute(""" SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)), e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA FROM HISTDOC h LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID WHERE h.ID = ? """, [idhistdoc]) row = cur.fetchone() if not row: return {'error': f'HISTDOC ID={idhistdoc} nenalezen'} import datetime def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row parsed = _parse_histdoc_blob(data_text or '') return { 'id': idhistdoc, 'typ': typ, 'datum': cv(datum), 'data': parsed, 'epoukaz': { 'id_dokladu': id_dokladu, 'id_zp': id_zp, 'odeslano': cv(odeslano), 'vydano': cv(vydano), 'chyba': chyba == 'T' if chyba else False, } } except Exception: log(f"parse_histdoc_data chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_info(table_name: str) -> dict: """Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(f.RDB$FIELD_NAME) AS SLOUPEC, CASE ft.RDB$FIELD_TYPE WHEN 7 THEN 'SMALLINT' WHEN 8 THEN 'INTEGER' WHEN 10 THEN 'FLOAT' WHEN 12 THEN 'DATE' WHEN 13 THEN 'TIME' WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 16 THEN 'BIGINT' WHEN 27 THEN 'DOUBLE' WHEN 35 THEN 'TIMESTAMP' WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 261 THEN 'BLOB' ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')' END AS TYP, CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE, CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK FROM RDB$RELATION_FIELDS f JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE LEFT JOIN ( SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME FROM RDB$INDICES i JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME WHERE i.RDB$RELATION_NAME = ? AND EXISTS ( SELECT 1 FROM RDB$RELATION_CONSTRAINTS c WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY' ) ) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME WHERE TRIM(f.RDB$RELATION_NAME) = ? ORDER BY f.RDB$FIELD_POSITION """, [table_name.upper(), table_name.upper()]) columns = [ {'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]} for r in cur.fetchall() ] # Počet záznamů try: cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}') count = cur.fetchone()[0] except Exception: count = None return { 'tabulka': table_name.upper(), 'pocet_zaznamu': count, 'sloupce': columns } except Exception: log(f"get_table_info chyba: {traceback.format_exc()}") raise @mcp.tool() def safe_query(sql: str, params: Optional[list] = None) -> dict: """Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách. Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY bez WHERE klauzule. Výsledek omezen na 500 řádků. """ try: sql_upper = sql.strip().upper() if not sql_upper.startswith('SELECT'): return {'error': 'safe_query podporuje pouze SELECT dotazy'} # Varování pro velké tabulky bez WHERE warnings = [] for tbl in LARGE_TABLES: if tbl in sql_upper and 'WHERE' not in sql_upper: warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!') # Přidej FIRST limit pokud chybí if 'FIRST ' not in sql_upper: sql = sql.strip() if sql.upper().startswith('SELECT'): sql = 'SELECT FIRST 500 ' + sql[6:].lstrip() cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) rows = rows_to_json(cur.fetchall(), cur.description or []) result = {'rowcount': len(rows), 'rows': rows} if warnings: result['warnings'] = warnings return result except Exception: log(f"safe_query chyba: {traceback.format_exc()}") raise if __name__ == '__main__': log("MCP Firebird server spuštěn (FastMCP)") mcp.run()