Files
ordinaceprojekt/mcp_medevio_mysql.py
2026-06-12 15:32:22 +02:00

286 lines
10 KiB
Python

#!/usr/bin/env python3
"""
MCP server pro MySQL/Medevio — používá oficiální MCP SDK (FastMCP)
Spustit: python mcp_medevio_mysql.py
Databáze `medevio` (MySQL, viz Knihovny/mysql_db.py): lokální zrcadlo dat
z Medevio GraphQL API (pacienti, požadavky, konverzace, dotazníky, agenda)
+ VZP data (stav pojištění, registrace, dávky).
Pozn.: živé API řeší sesterský server `mcp_medevio.py` (GraphQL) — tento
server je pro rychlé SQL dotazy nad synchronizovanými daty.
"""
import sys
import traceback
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
sys.path.insert(0, str(Path(__file__).resolve().parent))
from Knihovny.mysql_db import connect_mysql
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
def log(msg: str):
print(msg, file=sys.stderr, flush=True)
try:
conn = connect_mysql()
log("Připojeno k MySQL (medevio)")
except Exception as e:
log(f"Chyba připojení k MySQL: {e}")
sys.exit(1)
def _cursor():
"""Kurzor s automatickým reconnectem (MySQL spojení po čase usíná)."""
conn.ping(reconnect=True)
return conn.cursor()
def rows_to_json(rows, description):
"""Převede pymysql rows na JSON-serializovatelný formát."""
import datetime
import decimal
def convert(val):
if isinstance(val, (datetime.date, datetime.datetime)):
return val.isoformat()
if isinstance(val, datetime.timedelta):
return str(val)
if isinstance(val, decimal.Decimal):
return float(val)
if isinstance(val, bytes):
return val.decode('utf-8', errors='replace')
return val
cols = [d[0] for d in description]
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
def _strip_diacritics(s: str) -> str:
"""Bez diakritiky, velkými písmeny, sjednocené mezery."""
import re
import unicodedata
s = unicodedata.normalize('NFKD', s or '')
s = ''.join(c for c in s if not unicodedata.combining(c))
return re.sub(r'\s+', ' ', s).strip().upper()
# MCP server
mcp = FastMCP("medevio-mysql")
@mcp.tool()
def execute_query(sql: str, params: Optional[list] = None) -> dict:
"""Spusť SQL dotaz na Medevio MySQL databázi.
Pro SELECT/SHOW vrátí rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
"""
try:
cur = _cursor()
cur.execute(sql, params or None)
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESCRIBE')):
rows = rows_to_json(cur.fetchall(), cur.description or [])
return {'rowcount': len(rows), 'rows': rows}
conn.commit()
return {
'rowcount': cur.rowcount,
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
}
except Exception:
log(f"execute_query chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def safe_query(sql: str, params: Optional[list] = None) -> dict:
"""Bezpečný SELECT — pouze čtení, výsledek omezen na 500 řádků (LIMIT se
doplní automaticky, pokud chybí)."""
try:
sql_upper = sql.strip().upper()
if not sql_upper.startswith(('SELECT', 'SHOW', 'DESCRIBE')):
return {'error': 'safe_query podporuje pouze SELECT/SHOW dotazy'}
if sql_upper.startswith('SELECT') and 'LIMIT' not in sql_upper:
sql = sql.rstrip().rstrip(';') + ' LIMIT 500'
cur = _cursor()
cur.execute(sql, params or None)
rows = rows_to_json(cur.fetchall(), cur.description or [])
return {'rowcount': len(rows), 'rows': rows}
except Exception:
log(f"safe_query chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def list_tables() -> dict:
"""Vrátí seznam tabulek v databázi medevio včetně počtu řádků."""
try:
cur = _cursor()
cur.execute("SHOW TABLES")
tables = [r[0] for r in cur.fetchall()]
result = {}
for t in tables:
cur.execute(f"SELECT COUNT(*) FROM `{t}`")
result[t] = cur.fetchone()[0]
return {'pocet_tabulek': len(result), 'tabulky': result}
except Exception:
log(f"list_tables chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_table_info(table_name: str) -> dict:
"""Vrátí strukturu tabulky: sloupce s typy, nullable, klíče, default,
komentář + počet záznamů."""
try:
cur = _cursor()
cur.execute(f"SHOW FULL COLUMNS FROM `{table_name}`")
columns = [
{
'sloupec': r[0], 'typ': r[1], 'nullable': r[3],
'klic': r[4] or '', 'default': r[5], 'komentar': r[8] or '',
}
for r in cur.fetchall()
]
cur.execute(f"SELECT COUNT(*) FROM `{table_name}`")
count = cur.fetchone()[0]
return {'tabulka': table_name, 'pocet_zaznamu': count, 'sloupce': columns}
except Exception:
log(f"get_table_info chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_columns_overview(table_name: str, sample_rows: int = 1000) -> dict:
"""Přehled obsahu sloupců tabulky pro pochopení sémantiky. Ze vzorku
N řádků (výchozí 1000) vrátí pro každý sloupec: počet vyplněných, počet
distinct hodnot a top 5 nejčastějších hodnot s četností.
Hodnoty zkráceny na 80 znaků.
"""
try:
from collections import Counter
cur = _cursor()
cur.execute(f"SELECT * FROM `{table_name}` LIMIT {int(sample_rows)}")
rows = rows_to_json(cur.fetchall(), cur.description or [])
if not rows:
return {'tabulka': table_name, 'sample': 0, 'sloupce': {}}
overview = {}
for col in rows[0].keys():
values = [r[col] for r in rows if r[col] is not None and r[col] != '']
counter = Counter(str(v)[:80] for v in values)
overview[col] = {
'vyplneno': len(values),
'distinct': len(counter),
'top': [{'hodnota': v, 'pocet': n} for v, n in counter.most_common(5)],
}
return {'tabulka': table_name, 'sample': len(rows), 'sloupce': overview}
except Exception:
log(f"get_columns_overview chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def search_patient(query: str) -> list:
"""Vyhledá pacienty v medevio_pacient podle jména (bez ohledu na diakritiku
a pořadí slov), rodného čísla (jen číslice), e-mailu nebo telefonu
(jen číslice, ignoruje +420). Vrátí max. 50 výsledků vč. patient_id,
kontaktů, pojišťovny a `ma_ucet` (user_id != null = má Medevio účet).
"""
try:
import re
cur = _cursor()
cur.execute("""
SELECT patient_id, name, surname, identification_number, dob, sex,
email, phone, insurance_code, insurance_name, status,
has_mobile_app, user_id, user_email, user_phone
FROM medevio_pacient
""")
rows = rows_to_json(cur.fetchall(), cur.description or [])
q = query.strip().lower()
q_digits = re.sub(r'\D', '', query)
if q_digits.startswith('420'):
q_digits = q_digits[3:]
q_tokens = _strip_diacritics(query).split()
results = []
for r in rows:
hit = False
if len(q_digits) >= 4:
rc = re.sub(r'\D', '', r.get('identification_number') or '')
phones = ' '.join(
re.sub(r'\D', '', r.get(k) or '') for k in ('phone', 'user_phone')
)
hit = q_digits in rc or (len(q_digits) >= 6 and q_digits in phones)
if not hit and '@' in q:
emails = f"{(r.get('email') or '').lower()} {(r.get('user_email') or '').lower()}"
hit = q in emails
if not hit and q_tokens and not q_digits:
name_norm = _strip_diacritics(f"{r.get('name') or ''} {r.get('surname') or ''}")
hit = all(t in name_norm for t in q_tokens)
if hit:
r['ma_ucet'] = r.get('user_id') is not None
results.append(r)
results.sort(key=lambda p: (p.get('surname') or '', p.get('name') or ''))
return results[:50]
except Exception:
log(f"search_patient chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_requests(rodne_cislo: str, limit: int = 30) -> dict:
"""Vrátí požadavky pacienta z tabulky pozadavky podle rodného čísla
(lomítko nevadí). Řazení od nejnovějšího.
"""
try:
import re
rc = re.sub(r'\D', '', rodne_cislo)
cur = _cursor()
cur.execute("""
SELECT id, displayTitle, createdAt, doneAt, removedAt,
pacient_jmeno, pacient_prijmeni, pacient_rodnecislo
FROM pozadavky
WHERE REPLACE(pacient_rodnecislo, '/', '') = %s
ORDER BY createdAt DESC
LIMIT %s
""", [rc, int(limit)])
rows = rows_to_json(cur.fetchall(), cur.description or [])
return {'rodne_cislo': rc, 'pocet': len(rows), 'pozadavky': rows}
except Exception:
log(f"get_patient_requests chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_request_conversation(request_id: str) -> dict:
"""Vrátí zprávy konverzace k danému požadavku (medevio_conversation),
chronologicky. `od_ordinace` = zprávu poslala ordinace (ne pacient).
"""
try:
cur = _cursor()
cur.execute("""
SELECT sender_name, sender_clinic_id, text, created_at, read_at,
attachment_url, attachment_description, attachment_content_type
FROM medevio_conversation
WHERE request_id = %s
ORDER BY created_at
""", [request_id])
rows = rows_to_json(cur.fetchall(), cur.description or [])
for r in rows:
r['od_ordinace'] = r.pop('sender_clinic_id') is not None
return {'request_id': request_id, 'pocet_zprav': len(rows), 'zpravy': rows}
except Exception:
log(f"get_request_conversation chyba: {traceback.format_exc()}")
raise
if __name__ == '__main__':
log("MCP Medevio MySQL server spuštěn (FastMCP)")
mcp.run()