286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP server pro MySQL/Medevio — používá oficiální MCP SDK (FastMCP)
|
|
Spustit: python mcp_medevio_mysql.py
|
|
|
|
Databáze `medevio` (MySQL, viz Knihovny/mysql_db.py): lokální zrcadlo dat
|
|
z Medevio GraphQL API (pacienti, požadavky, konverzace, dotazníky, agenda)
|
|
+ VZP data (stav pojištění, registrace, dávky).
|
|
|
|
Pozn.: živé API řeší sesterský server `mcp_medevio.py` (GraphQL) — tento
|
|
server je pro rychlé SQL dotazy nad synchronizovanými daty.
|
|
"""
|
|
|
|
import sys
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
from Knihovny.mysql_db import connect_mysql
|
|
|
|
|
|
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
try:
|
|
conn = connect_mysql()
|
|
log("Připojeno k MySQL (medevio)")
|
|
except Exception as e:
|
|
log(f"Chyba připojení k MySQL: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def _cursor():
|
|
"""Kurzor s automatickým reconnectem (MySQL spojení po čase usíná)."""
|
|
conn.ping(reconnect=True)
|
|
return conn.cursor()
|
|
|
|
|
|
def rows_to_json(rows, description):
|
|
"""Převede pymysql rows na JSON-serializovatelný formát."""
|
|
import datetime
|
|
import decimal
|
|
|
|
def convert(val):
|
|
if isinstance(val, (datetime.date, datetime.datetime)):
|
|
return val.isoformat()
|
|
if isinstance(val, datetime.timedelta):
|
|
return str(val)
|
|
if isinstance(val, decimal.Decimal):
|
|
return float(val)
|
|
if isinstance(val, bytes):
|
|
return val.decode('utf-8', errors='replace')
|
|
return val
|
|
|
|
cols = [d[0] for d in description]
|
|
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
|
|
|
|
|
|
def _strip_diacritics(s: str) -> str:
|
|
"""Bez diakritiky, velkými písmeny, sjednocené mezery."""
|
|
import re
|
|
import unicodedata
|
|
s = unicodedata.normalize('NFKD', s or '')
|
|
s = ''.join(c for c in s if not unicodedata.combining(c))
|
|
return re.sub(r'\s+', ' ', s).strip().upper()
|
|
|
|
|
|
# MCP server
|
|
mcp = FastMCP("medevio-mysql")
|
|
|
|
|
|
@mcp.tool()
|
|
def execute_query(sql: str, params: Optional[list] = None) -> dict:
|
|
"""Spusť SQL dotaz na Medevio MySQL databázi.
|
|
Pro SELECT/SHOW vrátí rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
|
|
"""
|
|
try:
|
|
cur = _cursor()
|
|
cur.execute(sql, params or None)
|
|
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESCRIBE')):
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
return {'rowcount': len(rows), 'rows': rows}
|
|
conn.commit()
|
|
return {
|
|
'rowcount': cur.rowcount,
|
|
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
|
|
}
|
|
except Exception:
|
|
log(f"execute_query chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def safe_query(sql: str, params: Optional[list] = None) -> dict:
|
|
"""Bezpečný SELECT — pouze čtení, výsledek omezen na 500 řádků (LIMIT se
|
|
doplní automaticky, pokud chybí)."""
|
|
try:
|
|
sql_upper = sql.strip().upper()
|
|
if not sql_upper.startswith(('SELECT', 'SHOW', 'DESCRIBE')):
|
|
return {'error': 'safe_query podporuje pouze SELECT/SHOW dotazy'}
|
|
if sql_upper.startswith('SELECT') and 'LIMIT' not in sql_upper:
|
|
sql = sql.rstrip().rstrip(';') + ' LIMIT 500'
|
|
|
|
cur = _cursor()
|
|
cur.execute(sql, params or None)
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
return {'rowcount': len(rows), 'rows': rows}
|
|
except Exception:
|
|
log(f"safe_query chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def list_tables() -> dict:
|
|
"""Vrátí seznam tabulek v databázi medevio včetně počtu řádků."""
|
|
try:
|
|
cur = _cursor()
|
|
cur.execute("SHOW TABLES")
|
|
tables = [r[0] for r in cur.fetchall()]
|
|
result = {}
|
|
for t in tables:
|
|
cur.execute(f"SELECT COUNT(*) FROM `{t}`")
|
|
result[t] = cur.fetchone()[0]
|
|
return {'pocet_tabulek': len(result), 'tabulky': result}
|
|
except Exception:
|
|
log(f"list_tables chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_table_info(table_name: str) -> dict:
|
|
"""Vrátí strukturu tabulky: sloupce s typy, nullable, klíče, default,
|
|
komentář + počet záznamů."""
|
|
try:
|
|
cur = _cursor()
|
|
cur.execute(f"SHOW FULL COLUMNS FROM `{table_name}`")
|
|
columns = [
|
|
{
|
|
'sloupec': r[0], 'typ': r[1], 'nullable': r[3],
|
|
'klic': r[4] or '', 'default': r[5], 'komentar': r[8] or '',
|
|
}
|
|
for r in cur.fetchall()
|
|
]
|
|
cur.execute(f"SELECT COUNT(*) FROM `{table_name}`")
|
|
count = cur.fetchone()[0]
|
|
return {'tabulka': table_name, 'pocet_zaznamu': count, 'sloupce': columns}
|
|
except Exception:
|
|
log(f"get_table_info chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_columns_overview(table_name: str, sample_rows: int = 1000) -> dict:
|
|
"""Přehled obsahu sloupců tabulky pro pochopení sémantiky. Ze vzorku
|
|
N řádků (výchozí 1000) vrátí pro každý sloupec: počet vyplněných, počet
|
|
distinct hodnot a top 5 nejčastějších hodnot s četností.
|
|
Hodnoty zkráceny na 80 znaků.
|
|
"""
|
|
try:
|
|
from collections import Counter
|
|
cur = _cursor()
|
|
cur.execute(f"SELECT * FROM `{table_name}` LIMIT {int(sample_rows)}")
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
if not rows:
|
|
return {'tabulka': table_name, 'sample': 0, 'sloupce': {}}
|
|
|
|
overview = {}
|
|
for col in rows[0].keys():
|
|
values = [r[col] for r in rows if r[col] is not None and r[col] != '']
|
|
counter = Counter(str(v)[:80] for v in values)
|
|
overview[col] = {
|
|
'vyplneno': len(values),
|
|
'distinct': len(counter),
|
|
'top': [{'hodnota': v, 'pocet': n} for v, n in counter.most_common(5)],
|
|
}
|
|
return {'tabulka': table_name, 'sample': len(rows), 'sloupce': overview}
|
|
except Exception:
|
|
log(f"get_columns_overview chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def search_patient(query: str) -> list:
|
|
"""Vyhledá pacienty v medevio_pacient podle jména (bez ohledu na diakritiku
|
|
a pořadí slov), rodného čísla (jen číslice), e-mailu nebo telefonu
|
|
(jen číslice, ignoruje +420). Vrátí max. 50 výsledků vč. patient_id,
|
|
kontaktů, pojišťovny a `ma_ucet` (user_id != null = má Medevio účet).
|
|
"""
|
|
try:
|
|
import re
|
|
cur = _cursor()
|
|
cur.execute("""
|
|
SELECT patient_id, name, surname, identification_number, dob, sex,
|
|
email, phone, insurance_code, insurance_name, status,
|
|
has_mobile_app, user_id, user_email, user_phone
|
|
FROM medevio_pacient
|
|
""")
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
|
|
q = query.strip().lower()
|
|
q_digits = re.sub(r'\D', '', query)
|
|
if q_digits.startswith('420'):
|
|
q_digits = q_digits[3:]
|
|
q_tokens = _strip_diacritics(query).split()
|
|
|
|
results = []
|
|
for r in rows:
|
|
hit = False
|
|
if len(q_digits) >= 4:
|
|
rc = re.sub(r'\D', '', r.get('identification_number') or '')
|
|
phones = ' '.join(
|
|
re.sub(r'\D', '', r.get(k) or '') for k in ('phone', 'user_phone')
|
|
)
|
|
hit = q_digits in rc or (len(q_digits) >= 6 and q_digits in phones)
|
|
if not hit and '@' in q:
|
|
emails = f"{(r.get('email') or '').lower()} {(r.get('user_email') or '').lower()}"
|
|
hit = q in emails
|
|
if not hit and q_tokens and not q_digits:
|
|
name_norm = _strip_diacritics(f"{r.get('name') or ''} {r.get('surname') or ''}")
|
|
hit = all(t in name_norm for t in q_tokens)
|
|
if hit:
|
|
r['ma_ucet'] = r.get('user_id') is not None
|
|
results.append(r)
|
|
|
|
results.sort(key=lambda p: (p.get('surname') or '', p.get('name') or ''))
|
|
return results[:50]
|
|
except Exception:
|
|
log(f"search_patient chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_patient_requests(rodne_cislo: str, limit: int = 30) -> dict:
|
|
"""Vrátí požadavky pacienta z tabulky pozadavky podle rodného čísla
|
|
(lomítko nevadí). Řazení od nejnovějšího.
|
|
"""
|
|
try:
|
|
import re
|
|
rc = re.sub(r'\D', '', rodne_cislo)
|
|
cur = _cursor()
|
|
cur.execute("""
|
|
SELECT id, displayTitle, createdAt, doneAt, removedAt,
|
|
pacient_jmeno, pacient_prijmeni, pacient_rodnecislo
|
|
FROM pozadavky
|
|
WHERE REPLACE(pacient_rodnecislo, '/', '') = %s
|
|
ORDER BY createdAt DESC
|
|
LIMIT %s
|
|
""", [rc, int(limit)])
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
return {'rodne_cislo': rc, 'pocet': len(rows), 'pozadavky': rows}
|
|
except Exception:
|
|
log(f"get_patient_requests chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def get_request_conversation(request_id: str) -> dict:
|
|
"""Vrátí zprávy konverzace k danému požadavku (medevio_conversation),
|
|
chronologicky. `od_ordinace` = zprávu poslala ordinace (ne pacient).
|
|
"""
|
|
try:
|
|
cur = _cursor()
|
|
cur.execute("""
|
|
SELECT sender_name, sender_clinic_id, text, created_at, read_at,
|
|
attachment_url, attachment_description, attachment_content_type
|
|
FROM medevio_conversation
|
|
WHERE request_id = %s
|
|
ORDER BY created_at
|
|
""", [request_id])
|
|
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
|
for r in rows:
|
|
r['od_ordinace'] = r.pop('sender_clinic_id') is not None
|
|
return {'request_id': request_id, 'pocet_zprav': len(rows), 'zpravy': rows}
|
|
except Exception:
|
|
log(f"get_request_conversation chyba: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log("MCP Medevio MySQL server spuštěn (FastMCP)")
|
|
mcp.run()
|