#!/usr/bin/env python3 """ MCP server pro MySQL/Medevio — používá oficiální MCP SDK (FastMCP) Spustit: python mcp_medevio_mysql.py Databáze `medevio` (MySQL, viz Knihovny/mysql_db.py): lokální zrcadlo dat z Medevio GraphQL API (pacienti, požadavky, konverzace, dotazníky, agenda) + VZP data (stav pojištění, registrace, dávky). Pozn.: živé API řeší sesterský server `mcp_medevio.py` (GraphQL) — tento server je pro rychlé SQL dotazy nad synchronizovanými daty. """ import sys import traceback from pathlib import Path from typing import Optional from mcp.server.fastmcp import FastMCP sys.path.insert(0, str(Path(__file__).resolve().parent)) from Knihovny.mysql_db import connect_mysql # Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC def log(msg: str): print(msg, file=sys.stderr, flush=True) try: conn = connect_mysql() log("Připojeno k MySQL (medevio)") except Exception as e: log(f"Chyba připojení k MySQL: {e}") sys.exit(1) def _cursor(): """Kurzor s automatickým reconnectem (MySQL spojení po čase usíná).""" conn.ping(reconnect=True) return conn.cursor() def rows_to_json(rows, description): """Převede pymysql rows na JSON-serializovatelný formát.""" import datetime import decimal def convert(val): if isinstance(val, (datetime.date, datetime.datetime)): return val.isoformat() if isinstance(val, datetime.timedelta): return str(val) if isinstance(val, decimal.Decimal): return float(val) if isinstance(val, bytes): return val.decode('utf-8', errors='replace') return val cols = [d[0] for d in description] return [dict(zip(cols, [convert(v) for v in row])) for row in rows] def _strip_diacritics(s: str) -> str: """Bez diakritiky, velkými písmeny, sjednocené mezery.""" import re import unicodedata s = unicodedata.normalize('NFKD', s or '') s = ''.join(c for c in s if not unicodedata.combining(c)) return re.sub(r'\s+', ' ', s).strip().upper() # MCP server mcp = FastMCP("medevio-mysql") @mcp.tool() def execute_query(sql: str, params: Optional[list] = None) -> dict: """Spusť SQL dotaz na Medevio MySQL databázi. Pro SELECT/SHOW vrátí rows. Pro INSERT/UPDATE/DELETE vrátí rowcount. """ try: cur = _cursor() cur.execute(sql, params or None) if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESCRIBE')): rows = rows_to_json(cur.fetchall(), cur.description or []) return {'rowcount': len(rows), 'rows': rows} conn.commit() return { 'rowcount': cur.rowcount, 'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno' } except Exception: log(f"execute_query chyba: {traceback.format_exc()}") raise @mcp.tool() def safe_query(sql: str, params: Optional[list] = None) -> dict: """Bezpečný SELECT — pouze čtení, výsledek omezen na 500 řádků (LIMIT se doplní automaticky, pokud chybí).""" try: sql_upper = sql.strip().upper() if not sql_upper.startswith(('SELECT', 'SHOW', 'DESCRIBE')): return {'error': 'safe_query podporuje pouze SELECT/SHOW dotazy'} if sql_upper.startswith('SELECT') and 'LIMIT' not in sql_upper: sql = sql.rstrip().rstrip(';') + ' LIMIT 500' cur = _cursor() cur.execute(sql, params or None) rows = rows_to_json(cur.fetchall(), cur.description or []) return {'rowcount': len(rows), 'rows': rows} except Exception: log(f"safe_query chyba: {traceback.format_exc()}") raise @mcp.tool() def list_tables() -> dict: """Vrátí seznam tabulek v databázi medevio včetně počtu řádků.""" try: cur = _cursor() cur.execute("SHOW TABLES") tables = [r[0] for r in cur.fetchall()] result = {} for t in tables: cur.execute(f"SELECT COUNT(*) FROM `{t}`") result[t] = cur.fetchone()[0] return {'pocet_tabulek': len(result), 'tabulky': result} except Exception: log(f"list_tables chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_info(table_name: str) -> dict: """Vrátí strukturu tabulky: sloupce s typy, nullable, klíče, default, komentář + počet záznamů.""" try: cur = _cursor() cur.execute(f"SHOW FULL COLUMNS FROM `{table_name}`") columns = [ { 'sloupec': r[0], 'typ': r[1], 'nullable': r[3], 'klic': r[4] or '', 'default': r[5], 'komentar': r[8] or '', } for r in cur.fetchall() ] cur.execute(f"SELECT COUNT(*) FROM `{table_name}`") count = cur.fetchone()[0] return {'tabulka': table_name, 'pocet_zaznamu': count, 'sloupce': columns} except Exception: log(f"get_table_info chyba: {traceback.format_exc()}") raise @mcp.tool() def get_columns_overview(table_name: str, sample_rows: int = 1000) -> dict: """Přehled obsahu sloupců tabulky pro pochopení sémantiky. Ze vzorku N řádků (výchozí 1000) vrátí pro každý sloupec: počet vyplněných, počet distinct hodnot a top 5 nejčastějších hodnot s četností. Hodnoty zkráceny na 80 znaků. """ try: from collections import Counter cur = _cursor() cur.execute(f"SELECT * FROM `{table_name}` LIMIT {int(sample_rows)}") rows = rows_to_json(cur.fetchall(), cur.description or []) if not rows: return {'tabulka': table_name, 'sample': 0, 'sloupce': {}} overview = {} for col in rows[0].keys(): values = [r[col] for r in rows if r[col] is not None and r[col] != ''] counter = Counter(str(v)[:80] for v in values) overview[col] = { 'vyplneno': len(values), 'distinct': len(counter), 'top': [{'hodnota': v, 'pocet': n} for v, n in counter.most_common(5)], } return {'tabulka': table_name, 'sample': len(rows), 'sloupce': overview} except Exception: log(f"get_columns_overview chyba: {traceback.format_exc()}") raise @mcp.tool() def search_patient(query: str) -> list: """Vyhledá pacienty v medevio_pacient podle jména (bez ohledu na diakritiku a pořadí slov), rodného čísla (jen číslice), e-mailu nebo telefonu (jen číslice, ignoruje +420). Vrátí max. 50 výsledků vč. patient_id, kontaktů, pojišťovny a `ma_ucet` (user_id != null = má Medevio účet). """ try: import re cur = _cursor() cur.execute(""" SELECT patient_id, name, surname, identification_number, dob, sex, email, phone, insurance_code, insurance_name, status, has_mobile_app, user_id, user_email, user_phone FROM medevio_pacient """) rows = rows_to_json(cur.fetchall(), cur.description or []) q = query.strip().lower() q_digits = re.sub(r'\D', '', query) if q_digits.startswith('420'): q_digits = q_digits[3:] q_tokens = _strip_diacritics(query).split() results = [] for r in rows: hit = False if len(q_digits) >= 4: rc = re.sub(r'\D', '', r.get('identification_number') or '') phones = ' '.join( re.sub(r'\D', '', r.get(k) or '') for k in ('phone', 'user_phone') ) hit = q_digits in rc or (len(q_digits) >= 6 and q_digits in phones) if not hit and '@' in q: emails = f"{(r.get('email') or '').lower()} {(r.get('user_email') or '').lower()}" hit = q in emails if not hit and q_tokens and not q_digits: name_norm = _strip_diacritics(f"{r.get('name') or ''} {r.get('surname') or ''}") hit = all(t in name_norm for t in q_tokens) if hit: r['ma_ucet'] = r.get('user_id') is not None results.append(r) results.sort(key=lambda p: (p.get('surname') or '', p.get('name') or '')) return results[:50] except Exception: log(f"search_patient chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_requests(rodne_cislo: str, limit: int = 30) -> dict: """Vrátí požadavky pacienta z tabulky pozadavky podle rodného čísla (lomítko nevadí). Řazení od nejnovějšího. """ try: import re rc = re.sub(r'\D', '', rodne_cislo) cur = _cursor() cur.execute(""" SELECT id, displayTitle, createdAt, doneAt, removedAt, pacient_jmeno, pacient_prijmeni, pacient_rodnecislo FROM pozadavky WHERE REPLACE(pacient_rodnecislo, '/', '') = %s ORDER BY createdAt DESC LIMIT %s """, [rc, int(limit)]) rows = rows_to_json(cur.fetchall(), cur.description or []) return {'rodne_cislo': rc, 'pocet': len(rows), 'pozadavky': rows} except Exception: log(f"get_patient_requests chyba: {traceback.format_exc()}") raise @mcp.tool() def get_request_conversation(request_id: str) -> dict: """Vrátí zprávy konverzace k danému požadavku (medevio_conversation), chronologicky. `od_ordinace` = zprávu poslala ordinace (ne pacient). """ try: cur = _cursor() cur.execute(""" SELECT sender_name, sender_clinic_id, text, created_at, read_at, attachment_url, attachment_description, attachment_content_type FROM medevio_conversation WHERE request_id = %s ORDER BY created_at """, [request_id]) rows = rows_to_json(cur.fetchall(), cur.description or []) for r in rows: r['od_ordinace'] = r.pop('sender_clinic_id') is not None return {'request_id': request_id, 'pocet_zprav': len(rows), 'zpravy': rows} except Exception: log(f"get_request_conversation chyba: {traceback.format_exc()}") raise if __name__ == '__main__': log("MCP Medevio MySQL server spuštěn (FastMCP)") mcp.run()