906 lines
32 KiB
Python
906 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP)
|
||
Spustit: python mcp_firebird.py
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
from mcp.server.fastmcp import FastMCP
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||
from Knihovny.medicus_db import get_medicus_connection_reconnecting
|
||
|
||
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
|
||
def log(msg: str):
|
||
print(msg, file=sys.stderr, flush=True)
|
||
|
||
|
||
# Připojení k Firebirdu — lazy + auto-reconnect.
|
||
# Spojení se naváže až při prvním dotazu a samo se obnoví, když umře
|
||
# (uspání notebooku, denní gbak restore na serveru). Proto nepadáme při startu,
|
||
# i kdyby DB zrovna nebyla dostupná.
|
||
conn = get_medicus_connection_reconnecting()
|
||
try:
|
||
conn.cursor().execute("SELECT 1 FROM RDB$DATABASE")
|
||
log("✓ Připojeno k Firebirdu (Medicus)")
|
||
except Exception as e:
|
||
log(f"⚠ Firebird zatím nedostupný, zkusím se připojit při prvním dotazu: {e}")
|
||
|
||
|
||
def rows_to_json(rows, description):
|
||
"""Převede fdb rows na JSON-serializovatelný formát"""
|
||
import datetime
|
||
import decimal
|
||
|
||
def convert(val):
|
||
if isinstance(val, (datetime.date, datetime.datetime)):
|
||
return val.isoformat()
|
||
if isinstance(val, decimal.Decimal):
|
||
return float(val)
|
||
if isinstance(val, bytes):
|
||
return val.decode('win1250', errors='replace')
|
||
return val
|
||
|
||
cols = [d[0].strip() for d in description]
|
||
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
|
||
|
||
|
||
# MCP server
|
||
mcp = FastMCP("medicus-firebird")
|
||
|
||
|
||
@mcp.tool()
|
||
def execute_query(sql: str, params: Optional[list] = None) -> dict:
|
||
"""Spusť SQL dotaz na Medicus databázi.
|
||
Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
if params:
|
||
cur.execute(sql, params)
|
||
else:
|
||
cur.execute(sql)
|
||
|
||
if sql.strip().upper().startswith('SELECT'):
|
||
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
||
return {
|
||
'rowcount': len(rows),
|
||
'rows': rows
|
||
}
|
||
else:
|
||
conn.commit()
|
||
return {
|
||
'rowcount': cur.rowcount,
|
||
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
|
||
}
|
||
except Exception as e:
|
||
log(f"execute_query chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def list_tables() -> list[str]:
|
||
"""Vrátí seznam všech uživatelských tabulek v Medicus databázi."""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT TRIM(RDB$RELATION_NAME)
|
||
FROM RDB$RELATIONS
|
||
WHERE RDB$SYSTEM_FLAG = 0
|
||
ORDER BY RDB$RELATION_NAME
|
||
""")
|
||
return [row[0] for row in cur.fetchall()]
|
||
except Exception as e:
|
||
log(f"list_tables chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_table_columns(table_name: str) -> list[str]:
|
||
"""Vrátí seznam sloupců dané tabulky."""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT TRIM(RDB$FIELD_NAME)
|
||
FROM RDB$RELATION_FIELDS
|
||
WHERE TRIM(RDB$RELATION_NAME) = ?
|
||
ORDER BY RDB$FIELD_POSITION
|
||
""", [table_name.upper()])
|
||
return [row[0] for row in cur.fetchall()]
|
||
except Exception as e:
|
||
log(f"get_table_columns chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_schema() -> dict:
|
||
"""Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce."""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME
|
||
FROM RDB$RELATIONS r
|
||
LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME
|
||
WHERE r.RDB$SYSTEM_FLAG = 0
|
||
ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION
|
||
""")
|
||
|
||
schema = {}
|
||
for tbl_name, col_name in cur.fetchall():
|
||
tbl = tbl_name.strip() if tbl_name else 'unknown'
|
||
col = col_name.strip() if col_name else ''
|
||
if tbl not in schema:
|
||
schema[tbl] = []
|
||
if col:
|
||
schema[tbl].append(col)
|
||
|
||
return {
|
||
'table_count': len(schema),
|
||
'tables': list(schema.keys()),
|
||
'schema': schema
|
||
}
|
||
except Exception as e:
|
||
log(f"get_schema chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
|
||
# ── Velké tabulky kde je nutný WHERE filtr ──────────────────────────────────
|
||
LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'}
|
||
|
||
|
||
def _parse_histdoc_blob(data_text: str) -> dict:
|
||
"""Interní parser key=value blobu z HISTDOC.DATA."""
|
||
result = {}
|
||
if not data_text:
|
||
return result
|
||
for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'):
|
||
line = line.strip()
|
||
if '=' not in line:
|
||
continue
|
||
key, _, value = line.partition('=')
|
||
key = key.strip()
|
||
value = value.strip()
|
||
if value.startswith('C:'):
|
||
try:
|
||
result[key] = float(value[2:].replace(',', '.'))
|
||
except Exception:
|
||
result[key] = value[2:]
|
||
elif value.startswith('I:'):
|
||
try:
|
||
result[key] = int(value[2:])
|
||
except Exception:
|
||
result[key] = value[2:]
|
||
elif value.startswith('D:'):
|
||
result[key] = value[2:]
|
||
elif value in ('$:~\b', '$:~\x08', '$:'):
|
||
result[key] = ''
|
||
else:
|
||
result[key] = value
|
||
return result
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient(idpac: int) -> dict:
|
||
"""Vrátí základní info o pacientovi z tabulky KAR podle IDPAC.
|
||
Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN
|
||
FROM KAR WHERE IDPAC = ?
|
||
""", [idpac])
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
import datetime, decimal
|
||
def cv(v):
|
||
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
||
if isinstance(v, decimal.Decimal): return float(v)
|
||
return v
|
||
cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen']
|
||
return dict(zip(cols, [cv(v) for v in row]))
|
||
except Exception:
|
||
log(f"get_patient chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
def _strip_diacritics(s: str) -> str:
|
||
"""Bez diakritiky, velkými písmeny, sjednocené mezery."""
|
||
import re
|
||
import unicodedata
|
||
s = unicodedata.normalize('NFKD', s or '')
|
||
s = ''.join(c for c in s if not unicodedata.combining(c))
|
||
return re.sub(r'\s+', ' ', s).strip().upper()
|
||
|
||
|
||
@mcp.tool()
|
||
def search_patients(query: str, datum_narozeni: Optional[str] = None) -> list:
|
||
"""Vyhledá pacienty podle jména/příjmení (bez ohledu na diakritiku a pořadí slov,
|
||
např. "Mateju Petr" najde "Petr Matějů") nebo rodného čísla (částečná shoda, jen číslice).
|
||
datum_narozeni: volitelný filtr YYYY-MM-DD.
|
||
Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, datnar, pojistovna, vyrazen.
|
||
"""
|
||
try:
|
||
import datetime
|
||
import re
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN
|
||
FROM KAR
|
||
WHERE PRIJMENI IS NOT NULL AND PRIJMENI <> ''
|
||
""")
|
||
|
||
q_digits = re.sub(r'\D', '', query)
|
||
q_tokens = _strip_diacritics(query).split()
|
||
|
||
results = []
|
||
for r in cur.fetchall():
|
||
idpac, jmeno, prijmeni, rc, datnar, poj, vyrazen = r
|
||
datnar_iso = datnar.isoformat() if isinstance(datnar, datetime.date) else datnar
|
||
if datum_narozeni and str(datnar_iso or '')[:10] != datum_narozeni:
|
||
continue
|
||
if q_digits and len(q_digits) >= 4:
|
||
if q_digits not in (rc or ''):
|
||
continue
|
||
elif q_tokens:
|
||
name_norm = _strip_diacritics(f"{jmeno or ''} {prijmeni or ''}")
|
||
if not all(t in name_norm for t in q_tokens):
|
||
continue
|
||
elif not datum_narozeni:
|
||
continue # prázdný dotaz bez filtru data — nevracet celou kartotéku
|
||
results.append({
|
||
'idpac': idpac, 'jmeno': jmeno, 'prijmeni': prijmeni, 'rc': rc,
|
||
'datnar': datnar_iso, 'pojistovna': poj, 'vyrazen': vyrazen == 'A',
|
||
})
|
||
|
||
results.sort(key=lambda p: (p['prijmeni'] or '', p['jmeno'] or ''))
|
||
return results[:50]
|
||
except Exception:
|
||
log(f"search_patients chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def search_patient_by_contact(kontakt: str) -> list:
|
||
"""Vyhledá pacienty podle kontaktu (e-mail nebo telefon) v tabulce KARKONTAKT.
|
||
Částečná shoda bez ohledu na velikost písmen; u telefonů se porovnávají jen
|
||
číslice (ignoruje mezery a +420). Vrátí max. 50 výsledků: pacient (idpac,
|
||
jmeno, prijmeni, rc, datnar, pojistovna, vyrazen) + kontakt (typ, popis, vztah).
|
||
"""
|
||
try:
|
||
import datetime
|
||
import re
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT kk.IDPAC, kk.KONTAKT, kk.TYP, kk.POPIS, kk.VZTAH,
|
||
k.JMENO, k.PRIJMENI, k.RODCIS, k.DATNAR, k.POJ, k.VYRAZEN
|
||
FROM KARKONTAKT kk
|
||
JOIN KAR k ON k.IDPAC = kk.IDPAC
|
||
WHERE kk.KONTAKT IS NOT NULL AND kk.KONTAKT <> ''
|
||
""")
|
||
|
||
q = kontakt.strip().lower()
|
||
q_digits = re.sub(r'\D', '', kontakt)
|
||
if q_digits.startswith('420'):
|
||
q_digits = q_digits[3:]
|
||
|
||
results = []
|
||
for r in cur.fetchall():
|
||
(idpac, kont, typ, popis, vztah,
|
||
jmeno, prijmeni, rc, datnar, poj, vyrazen) = r
|
||
kont = (kont or '').strip()
|
||
hit = q and q in kont.lower()
|
||
if not hit and len(q_digits) >= 6:
|
||
kont_digits = re.sub(r'\D', '', kont)
|
||
if kont_digits.startswith('420'):
|
||
kont_digits = kont_digits[3:]
|
||
hit = q_digits in kont_digits
|
||
if not hit:
|
||
continue
|
||
results.append({
|
||
'idpac': idpac, 'jmeno': jmeno, 'prijmeni': prijmeni, 'rc': rc,
|
||
'datnar': datnar.isoformat() if isinstance(datnar, datetime.date) else datnar,
|
||
'pojistovna': poj, 'vyrazen': vyrazen == 'A',
|
||
'kontakt': kont, 'typ': typ, 'popis': popis or '', 'vztah': vztah or '',
|
||
})
|
||
|
||
return results[:50]
|
||
except Exception:
|
||
log(f"search_patient_by_contact chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict:
|
||
"""Chronologický přehled všech záznamů pacienta z DOCLIST.
|
||
datum_od / datum_do ve formátu YYYY-MM-DD (volitelné).
|
||
Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis).
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
# Info o pacientovi
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
# Timeline z DOCLIST
|
||
sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?"
|
||
params = [idpac]
|
||
if datum_od:
|
||
sql += " AND DATUM >= ?"
|
||
params.append(datum_od)
|
||
if datum_do:
|
||
sql += " AND DATUM <= ?"
|
||
params.append(datum_do)
|
||
sql += " ORDER BY DATUM, ID"
|
||
|
||
cur.execute(sql, params)
|
||
import datetime, decimal
|
||
def cv(v):
|
||
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
||
return v
|
||
|
||
events = [
|
||
{'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'pocet': len(events),
|
||
'events': events
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_timeline chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def parse_histdoc_data(idhistdoc: int) -> dict:
|
||
"""Načte a dekóduje DATA blob z HISTDOC pro daný záznam.
|
||
Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad…
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)),
|
||
e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA
|
||
FROM HISTDOC h
|
||
LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID
|
||
WHERE h.ID = ?
|
||
""", [idhistdoc])
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return {'error': f'HISTDOC ID={idhistdoc} nenalezen'}
|
||
import datetime
|
||
def cv(v):
|
||
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
|
||
return v
|
||
typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row
|
||
parsed = _parse_histdoc_blob(data_text or '')
|
||
return {
|
||
'id': idhistdoc,
|
||
'typ': typ,
|
||
'datum': cv(datum),
|
||
'data': parsed,
|
||
'epoukaz': {
|
||
'id_dokladu': id_dokladu,
|
||
'id_zp': id_zp,
|
||
'odeslano': cv(odeslano),
|
||
'vydano': cv(vydano),
|
||
'chyba': chyba == 'T' if chyba else False,
|
||
}
|
||
}
|
||
except Exception:
|
||
log(f"parse_histdoc_data chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_table_info(table_name: str) -> dict:
|
||
"""Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů."""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT
|
||
TRIM(f.RDB$FIELD_NAME) AS SLOUPEC,
|
||
CASE ft.RDB$FIELD_TYPE
|
||
WHEN 7 THEN 'SMALLINT'
|
||
WHEN 8 THEN 'INTEGER'
|
||
WHEN 10 THEN 'FLOAT'
|
||
WHEN 12 THEN 'DATE'
|
||
WHEN 13 THEN 'TIME'
|
||
WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')'
|
||
WHEN 16 THEN 'BIGINT'
|
||
WHEN 27 THEN 'DOUBLE'
|
||
WHEN 35 THEN 'TIMESTAMP'
|
||
WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')'
|
||
WHEN 261 THEN 'BLOB'
|
||
ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')'
|
||
END AS TYP,
|
||
CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE,
|
||
CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK
|
||
FROM RDB$RELATION_FIELDS f
|
||
JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE
|
||
LEFT JOIN (
|
||
SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME
|
||
FROM RDB$INDICES i
|
||
JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME
|
||
WHERE i.RDB$RELATION_NAME = ?
|
||
AND EXISTS (
|
||
SELECT 1 FROM RDB$RELATION_CONSTRAINTS c
|
||
WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME
|
||
AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY'
|
||
)
|
||
) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME
|
||
WHERE TRIM(f.RDB$RELATION_NAME) = ?
|
||
ORDER BY f.RDB$FIELD_POSITION
|
||
""", [table_name.upper(), table_name.upper()])
|
||
columns = [
|
||
{'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]}
|
||
for r in cur.fetchall()
|
||
]
|
||
# Počet záznamů
|
||
try:
|
||
cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}')
|
||
count = cur.fetchone()[0]
|
||
except Exception:
|
||
count = None
|
||
return {
|
||
'tabulka': table_name.upper(),
|
||
'pocet_zaznamu': count,
|
||
'sloupce': columns
|
||
}
|
||
except Exception:
|
||
log(f"get_table_info chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_columns_overview(table_name: str, sample_rows: int = 1000) -> dict:
|
||
"""Přehled obsahu sloupců tabulky pro pochopení sémantiky (např. co znamená
|
||
KARKONTAKT.TYP=3 nebo RECEPT.STORNO='T'). Ze vzorku posledních N řádků
|
||
(výchozí 1000) vrátí pro každý sloupec: počet vyplněných, počet distinct
|
||
hodnot a top 5 nejčastějších hodnot s četností. Hodnoty zkráceny na 80 znaků.
|
||
"""
|
||
try:
|
||
from collections import Counter
|
||
cur = conn.cursor()
|
||
cur.execute(f'SELECT FIRST {int(sample_rows)} * FROM {table_name.upper()}')
|
||
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
||
|
||
if not rows:
|
||
return {'tabulka': table_name.upper(), 'sample': 0, 'sloupce': {}}
|
||
|
||
overview = {}
|
||
for col in rows[0].keys():
|
||
values = [r[col] for r in rows if r[col] is not None and r[col] != '']
|
||
counter = Counter(
|
||
str(v)[:80] for v in values
|
||
)
|
||
overview[col] = {
|
||
'vyplneno': len(values),
|
||
'distinct': len(counter),
|
||
'top': [
|
||
{'hodnota': v, 'pocet': n} for v, n in counter.most_common(5)
|
||
],
|
||
}
|
||
result = {
|
||
'tabulka': table_name.upper(),
|
||
'sample': len(rows),
|
||
'sloupce': overview,
|
||
}
|
||
if table_name.upper() in LARGE_TABLES:
|
||
result['warning'] = (
|
||
f'Tabulka {table_name.upper()} je velká — přehled je jen '
|
||
f'ze vzorku prvních {len(rows)} řádků.'
|
||
)
|
||
return result
|
||
except Exception:
|
||
log(f"get_columns_overview chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def safe_query(sql: str, params: Optional[list] = None) -> dict:
|
||
"""Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách.
|
||
Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY
|
||
bez WHERE klauzule. Výsledek omezen na 500 řádků.
|
||
"""
|
||
try:
|
||
sql_upper = sql.strip().upper()
|
||
if not sql_upper.startswith('SELECT'):
|
||
return {'error': 'safe_query podporuje pouze SELECT dotazy'}
|
||
|
||
# Varování pro velké tabulky bez WHERE
|
||
warnings = []
|
||
for tbl in LARGE_TABLES:
|
||
if tbl in sql_upper and 'WHERE' not in sql_upper:
|
||
warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!')
|
||
|
||
# Přidej FIRST limit pokud chybí
|
||
if 'FIRST ' not in sql_upper:
|
||
sql = sql.strip()
|
||
if sql.upper().startswith('SELECT'):
|
||
sql = 'SELECT FIRST 500 ' + sql[6:].lstrip()
|
||
|
||
cur = conn.cursor()
|
||
if params:
|
||
cur.execute(sql, params)
|
||
else:
|
||
cur.execute(sql)
|
||
|
||
rows = rows_to_json(cur.fetchall(), cur.description or [])
|
||
result = {'rowcount': len(rows), 'rows': rows}
|
||
if warnings:
|
||
result['warnings'] = warnings
|
||
return result
|
||
except Exception:
|
||
log(f"safe_query chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_prescriptions(idpac: int, months: int = 6) -> dict:
|
||
"""Vrátí seznam předepsaných léků pacienta z receptů za posledních N měsíců (výchozí 6).
|
||
Výsledek: pacient (jmeno, prijmeni, rc) + seznam receptů (datum, lek, dsig).
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
cur.execute("""
|
||
SELECT DATUM, LEK, DSIG, TEXT
|
||
FROM RECEPT
|
||
WHERE IDPAC = ?
|
||
AND DATUM >= DATEADD(? MONTH TO CURRENT_DATE)
|
||
ORDER BY DATUM DESC, ID DESC
|
||
""", [idpac, -months])
|
||
|
||
import datetime
|
||
def parse_baleni(text):
|
||
"""Z TEXT sloupce vytáhne druhý řádek s info o balení (dávka, forma, počet)."""
|
||
if not text:
|
||
return ''
|
||
parts = text.replace('\r\n', '\n').split('\n')
|
||
return parts[1].strip() if len(parts) > 1 else ''
|
||
|
||
recepty = [
|
||
{
|
||
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
|
||
'lek': r[1],
|
||
'baleni': parse_baleni(r[3]),
|
||
'dsig': r[2] or '',
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'obdobi_mesice': months,
|
||
'pocet': len(recepty),
|
||
'recepty': recepty,
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_prescriptions chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_dxa_files(idpac: int) -> dict:
|
||
"""Zjistí, zda má pacient fyzicky uložený DXA/denzitometrický nález v tabulce FILES.
|
||
Hledá v FILENAME výrazy: DXA, DENZIT, OSTEOPOR (bez rozlišení velikosti písmen).
|
||
Vrátí: pacient + seznam nalezených souborů (id, filename, datum, poznamka).
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
cur.execute("""
|
||
SELECT ID, FILENAME, DATUM, POZNAMKA
|
||
FROM FILES
|
||
WHERE IDPAC = ?
|
||
AND UPPER(FILENAME) LIKE '%DXA%'
|
||
ORDER BY DATUM DESC, ID DESC
|
||
""", [idpac])
|
||
|
||
import datetime
|
||
soubory = [
|
||
{
|
||
'id': r[0],
|
||
'filename': r[1],
|
||
'datum': r[2].isoformat() if isinstance(r[2], datetime.date) else r[2],
|
||
'poznamka': r[3] or '',
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'ma_dxa': len(soubory) > 0,
|
||
'pocet': len(soubory),
|
||
'soubory': soubory,
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_dxa_files chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_vaccinations(idpac: int) -> dict:
|
||
"""Vrátí všechna očkování pacienta z tabulky OCKZAZ.
|
||
Výsledek: pacient + seznam očkování (datum, latka, zkratka, nazev, davka, por, priste, poznamka).
|
||
Řazení: od nejnovějšího. Zrušené záznamy (ZRUSENO='T') jsou označeny.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
cur.execute("""
|
||
SELECT DATUM, LATKA, ZKRATKA, NAZEV, DAVKA, POR, PRISTE, POZNAMKA, ZRUSENO
|
||
FROM OCKZAZ
|
||
WHERE IDPAC = ?
|
||
ORDER BY DATUM DESC, ID DESC
|
||
""", [idpac])
|
||
|
||
import datetime
|
||
ockovani = [
|
||
{
|
||
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
|
||
'latka': r[1] or '',
|
||
'zkratka': r[2] or '',
|
||
'nazev': r[3] or '',
|
||
'davka': r[4] or '',
|
||
'poradi': r[5],
|
||
'priste': r[6].isoformat() if isinstance(r[6], datetime.date) else r[6],
|
||
'poznamka': r[7] or '',
|
||
'zruseno': r[8] == 'T',
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'pocet': len(ockovani),
|
||
'ockovani': ockovani,
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_vaccinations chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_psa_lab(idpac: int) -> dict:
|
||
"""Vrátí laboratorní výsledky PSA pacienta: datum, hodnota, jednotka, referenční meze (dolní/horní).
|
||
Hledá všechny metody jejichž název obsahuje PSA (Total PSA, Free PSA, PSA celkový atd.).
|
||
Výsledek seřazen od nejnovějšího.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
cur.execute("""
|
||
SELECT
|
||
h.DATUM,
|
||
m.NAZEV,
|
||
d.VYSL,
|
||
j.JEDN,
|
||
s.NORMDOL,
|
||
s.NORMHOR
|
||
FROM LABVH h
|
||
JOIN LABVD d ON d.IDVH = h.IDVH
|
||
JOIN LABMETOD m ON m.IDMETOD = d.IDMETOD
|
||
LEFT JOIN LABJEDN j ON j.IDJEDN = d.IDJEDN
|
||
LEFT JOIN LABSKALY s ON s.IDSKALY = d.IDSKALY AND s.TYP = '4'
|
||
WHERE h.IDPACIENT = ?
|
||
AND UPPER(m.NAZEV) LIKE '%PSA%'
|
||
ORDER BY h.DATUM DESC, h.IDVH DESC
|
||
""", [idpac])
|
||
|
||
import datetime
|
||
vysledky = [
|
||
{
|
||
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
|
||
'metoda': r[1],
|
||
'hodnota': r[2],
|
||
'jednotka': r[3] or '',
|
||
'ref_dolni': r[4],
|
||
'ref_horni': r[5],
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'pocet': len(vysledky),
|
||
'vysledky': vysledky,
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_psa_lab chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
@mcp.tool()
|
||
def get_patient_sick_leaves(idpac: int) -> dict:
|
||
"""Vrátí přehled pracovních neschopností pacienta z tabulky NES.
|
||
Výsledek: pacient + seznam neschopností (od, do, diagnóza, stav, zaměstnavatel, počet dní, ECN číslo).
|
||
Stornované záznamy jsou označeny. Řazení od nejnovějšího.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
|
||
pac = cur.fetchone()
|
||
if not pac:
|
||
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
|
||
|
||
cur.execute("""
|
||
SELECT ZACNES, KONNES, DIAGNO, PRACNE, PRICINA, PODNIK, PROFES,
|
||
STORNO, ECN, DUVOD_UKONCENI
|
||
FROM NES
|
||
WHERE IDPAC = ?
|
||
ORDER BY ZACNES DESC, ID DESC
|
||
""", [idpac])
|
||
|
||
import datetime
|
||
def fmt_date(v):
|
||
return v.isoformat() if isinstance(v, datetime.date) else v
|
||
|
||
def pocet_dni(zacnes, konnes, pracne):
|
||
if not zacnes:
|
||
return None
|
||
konec = konnes if konnes else datetime.date.today()
|
||
if isinstance(zacnes, datetime.date) and isinstance(konec, datetime.date):
|
||
return (konec - zacnes).days + 1
|
||
return None
|
||
|
||
neschopenky = [
|
||
{
|
||
'od': fmt_date(r[0]),
|
||
'do': fmt_date(r[1]),
|
||
'diagno': (r[2] or '').strip(),
|
||
'stav': 'aktivní' if r[3] == 'A' else 'ukončená',
|
||
'pricina': r[4] or '',
|
||
'zamestnavatel': r[5] or '',
|
||
'profes': r[6] or '',
|
||
'storno': r[7] == 'T',
|
||
'ecn': r[8] or '',
|
||
'duvod_ukonceni': r[9] or '',
|
||
'pocet_dni': pocet_dni(r[0], r[1], r[3]),
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
aktivni = sum(1 for n in neschopenky if n['stav'] == 'aktivní' and not n['storno'])
|
||
return {
|
||
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
|
||
'pocet_celkem': len(neschopenky),
|
||
'pocet_aktivnich': aktivni,
|
||
'neschopenky': neschopenky,
|
||
}
|
||
except Exception:
|
||
log(f"get_patient_sick_leaves chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
PSA_KODY = ('01131', '01132', '01133', '81227', '81530', '81718', '81800', '93225')
|
||
|
||
|
||
@mcp.tool()
|
||
def get_psa_records(idpac: Optional[int] = None, rok: Optional[int] = None) -> dict:
|
||
"""Vrátí přehled vykázaných PSA výkonů z DOKLADD.
|
||
Sledované kódy: 01131–01133, 81227, 81530, 81718, 81800, 93225.
|
||
idpac: filtr konkrétního pacienta (None = všichni).
|
||
rok: filtr roku (None = vše).
|
||
Výsledek: počet + seznam (datum, kód, název, příjmení, jméno, rc).
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
kody_placeholder = ','.join(['?' for _ in PSA_KODY])
|
||
params = list(PSA_KODY)
|
||
sql = f"""
|
||
SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS
|
||
FROM DOKLADD d
|
||
JOIN KAR k ON k.RODCIS = d.RODCIS
|
||
JOIN VYKONY v ON v.KOD = d.KOD
|
||
AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE)
|
||
WHERE d.KOD IN ({kody_placeholder})
|
||
"""
|
||
if idpac:
|
||
sql += " AND k.IDPAC = ?"
|
||
params.append(idpac)
|
||
if rok:
|
||
sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?"
|
||
params.append(rok)
|
||
sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI"
|
||
|
||
cur.execute(sql, params)
|
||
import datetime
|
||
zaznamy = [
|
||
{
|
||
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
|
||
'kod': r[1],
|
||
'naz': r[2],
|
||
'prijmeni': r[3],
|
||
'jmeno': r[4],
|
||
'rc': r[5].strip() if r[5] else '',
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'idpac': idpac,
|
||
'rok': rok,
|
||
'pocet': len(zaznamy),
|
||
'zaznamy': zaznamy,
|
||
}
|
||
except Exception:
|
||
log(f"get_psa_records chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
DXA_KODY = ('10035', '11320', '11321', '11322', '11323', '11324', '11325', '11326')
|
||
|
||
|
||
@mcp.tool()
|
||
def get_dxa_records(rok: Optional[int] = None) -> dict:
|
||
"""Vrátí přehled vykázaných DXA výkonů (osteoporóza, denzitometrie).
|
||
Sledované kódy: 11320, 11321, 11322–11326, 10035.
|
||
rok: filtr roku (např. 2025), None = vše.
|
||
Výsledek: celkový počet + seznam záznamů (datum, kód, název kódu, příjmení, jméno, rc).
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
kody_placeholder = ','.join(['?' for _ in DXA_KODY])
|
||
params = list(DXA_KODY)
|
||
sql = f"""
|
||
SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS
|
||
FROM DOKLADD d
|
||
JOIN KAR k ON k.RODCIS = d.RODCIS
|
||
JOIN VYKONY v ON v.KOD = d.KOD
|
||
AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE)
|
||
WHERE d.KOD IN ({kody_placeholder})
|
||
"""
|
||
if rok:
|
||
sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?"
|
||
params.append(rok)
|
||
sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI"
|
||
|
||
cur.execute(sql, params)
|
||
import datetime
|
||
zaznamy = [
|
||
{
|
||
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
|
||
'kod': r[1],
|
||
'naz': r[2],
|
||
'prijmeni': r[3],
|
||
'jmeno': r[4],
|
||
'rc': r[5].strip() if r[5] else '',
|
||
}
|
||
for r in cur.fetchall()
|
||
]
|
||
return {
|
||
'rok': rok,
|
||
'pocet': len(zaznamy),
|
||
'zaznamy': zaznamy,
|
||
}
|
||
except Exception:
|
||
log(f"get_dxa_records chyba: {traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
if __name__ == '__main__':
|
||
log("MCP Firebird server spuštěn (FastMCP)")
|
||
mcp.run()
|