Files
ordinaceprojekt/mcp_firebird.py
T
2026-05-25 16:44:34 +02:00

771 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP)
Spustit: python mcp_firebird.py
"""
import sys
import fdb
import traceback
from typing import Optional
from mcp.server.fastmcp import FastMCP
FB_CONFIG = {
'dsn': r'reporter:c:\medicus\medicus.fdb',
'user': 'SYSDBA',
'password': 'masterkey',
'charset': 'win1250',
}
# Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC
def log(msg: str):
print(msg, file=sys.stderr, flush=True)
# Připojení k Firebirdu
try:
conn = fdb.connect(**FB_CONFIG)
log("✓ Připojeno k Firebirdu (Medicus)")
except Exception as e:
log(f"✗ Chyba připojení k Firebirdu: {e}")
sys.exit(1)
def rows_to_json(rows, description):
"""Převede fdb rows na JSON-serializovatelný formát"""
import datetime
import decimal
def convert(val):
if isinstance(val, (datetime.date, datetime.datetime)):
return val.isoformat()
if isinstance(val, decimal.Decimal):
return float(val)
if isinstance(val, bytes):
return val.decode('win1250', errors='replace')
return val
cols = [d[0].strip() for d in description]
return [dict(zip(cols, [convert(v) for v in row])) for row in rows]
# MCP server
mcp = FastMCP("medicus-firebird")
@mcp.tool()
def execute_query(sql: str, params: Optional[list] = None) -> dict:
"""Spusť SQL dotaz na Medicus databázi.
Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount.
"""
try:
cur = conn.cursor()
if params:
cur.execute(sql, params)
else:
cur.execute(sql)
if sql.strip().upper().startswith('SELECT'):
rows = rows_to_json(cur.fetchall(), cur.description or [])
return {
'rowcount': len(rows),
'rows': rows
}
else:
conn.commit()
return {
'rowcount': cur.rowcount,
'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno'
}
except Exception as e:
log(f"execute_query chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def list_tables() -> list[str]:
"""Vrátí seznam všech uživatelských tabulek v Medicus databázi."""
try:
cur = conn.cursor()
cur.execute("""
SELECT TRIM(RDB$RELATION_NAME)
FROM RDB$RELATIONS
WHERE RDB$SYSTEM_FLAG = 0
ORDER BY RDB$RELATION_NAME
""")
return [row[0] for row in cur.fetchall()]
except Exception as e:
log(f"list_tables chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_table_columns(table_name: str) -> list[str]:
"""Vrátí seznam sloupců dané tabulky."""
try:
cur = conn.cursor()
cur.execute("""
SELECT TRIM(RDB$FIELD_NAME)
FROM RDB$RELATION_FIELDS
WHERE TRIM(RDB$RELATION_NAME) = ?
ORDER BY RDB$FIELD_POSITION
""", [table_name.upper()])
return [row[0] for row in cur.fetchall()]
except Exception as e:
log(f"get_table_columns chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_schema() -> dict:
"""Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce."""
try:
cur = conn.cursor()
cur.execute("""
SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME
FROM RDB$RELATIONS r
LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME
WHERE r.RDB$SYSTEM_FLAG = 0
ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION
""")
schema = {}
for tbl_name, col_name in cur.fetchall():
tbl = tbl_name.strip() if tbl_name else 'unknown'
col = col_name.strip() if col_name else ''
if tbl not in schema:
schema[tbl] = []
if col:
schema[tbl].append(col)
return {
'table_count': len(schema),
'tables': list(schema.keys()),
'schema': schema
}
except Exception as e:
log(f"get_schema chyba: {traceback.format_exc()}")
raise
# ── Velké tabulky kde je nutný WHERE filtr ──────────────────────────────────
LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'}
def _parse_histdoc_blob(data_text: str) -> dict:
"""Interní parser key=value blobu z HISTDOC.DATA."""
result = {}
if not data_text:
return result
for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'):
line = line.strip()
if '=' not in line:
continue
key, _, value = line.partition('=')
key = key.strip()
value = value.strip()
if value.startswith('C:'):
try:
result[key] = float(value[2:].replace(',', '.'))
except Exception:
result[key] = value[2:]
elif value.startswith('I:'):
try:
result[key] = int(value[2:])
except Exception:
result[key] = value[2:]
elif value.startswith('D:'):
result[key] = value[2:]
elif value in ('$:~\b', '$:~\x08', '$:'):
result[key] = ''
else:
result[key] = value
return result
@mcp.tool()
def get_patient(idpac: int) -> dict:
"""Vrátí základní info o pacientovi z tabulky KAR podle IDPAC.
Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen.
"""
try:
cur = conn.cursor()
cur.execute("""
SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN
FROM KAR WHERE IDPAC = ?
""", [idpac])
row = cur.fetchone()
if not row:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
import datetime, decimal
def cv(v):
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
if isinstance(v, decimal.Decimal): return float(v)
return v
cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen']
return dict(zip(cols, [cv(v) for v in row]))
except Exception:
log(f"get_patient chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def search_patients(query: str) -> list:
"""Vyhledá pacienty podle příjmení, jména nebo rodného čísla (částečná shoda).
Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, pojistovna.
"""
try:
cur = conn.cursor()
q = query.strip().upper()
cur.execute("""
SELECT FIRST 50 IDPAC, JMENO, PRIJMENI, RODCIS, POJ
FROM KAR
WHERE UPPER(PRIJMENI) LIKE ? OR UPPER(JMENO) LIKE ? OR RODCIS LIKE ?
ORDER BY PRIJMENI, JMENO
""", [f'%{q}%', f'%{q}%', f'%{q}%'])
rows = cur.fetchall()
return [
{'idpac': r[0], 'jmeno': r[1], 'prijmeni': r[2], 'rc': r[3], 'pojistovna': r[4]}
for r in rows
]
except Exception:
log(f"search_patients chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict:
"""Chronologický přehled všech záznamů pacienta z DOCLIST.
datum_od / datum_do ve formátu YYYY-MM-DD (volitelné).
Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis).
"""
try:
cur = conn.cursor()
# Info o pacientovi
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
# Timeline z DOCLIST
sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?"
params = [idpac]
if datum_od:
sql += " AND DATUM >= ?"
params.append(datum_od)
if datum_do:
sql += " AND DATUM <= ?"
params.append(datum_do)
sql += " ORDER BY DATUM, ID"
cur.execute(sql, params)
import datetime, decimal
def cv(v):
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
return v
events = [
{'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''}
for r in cur.fetchall()
]
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'pocet': len(events),
'events': events
}
except Exception:
log(f"get_patient_timeline chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def parse_histdoc_data(idhistdoc: int) -> dict:
"""Načte a dekóduje DATA blob z HISTDOC pro daný záznam.
Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad…
"""
try:
cur = conn.cursor()
cur.execute("""
SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)),
e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA
FROM HISTDOC h
LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID
WHERE h.ID = ?
""", [idhistdoc])
row = cur.fetchone()
if not row:
return {'error': f'HISTDOC ID={idhistdoc} nenalezen'}
import datetime
def cv(v):
if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat()
return v
typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row
parsed = _parse_histdoc_blob(data_text or '')
return {
'id': idhistdoc,
'typ': typ,
'datum': cv(datum),
'data': parsed,
'epoukaz': {
'id_dokladu': id_dokladu,
'id_zp': id_zp,
'odeslano': cv(odeslano),
'vydano': cv(vydano),
'chyba': chyba == 'T' if chyba else False,
}
}
except Exception:
log(f"parse_histdoc_data chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_table_info(table_name: str) -> dict:
"""Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů."""
try:
cur = conn.cursor()
cur.execute("""
SELECT
TRIM(f.RDB$FIELD_NAME) AS SLOUPEC,
CASE ft.RDB$FIELD_TYPE
WHEN 7 THEN 'SMALLINT'
WHEN 8 THEN 'INTEGER'
WHEN 10 THEN 'FLOAT'
WHEN 12 THEN 'DATE'
WHEN 13 THEN 'TIME'
WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')'
WHEN 16 THEN 'BIGINT'
WHEN 27 THEN 'DOUBLE'
WHEN 35 THEN 'TIMESTAMP'
WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')'
WHEN 261 THEN 'BLOB'
ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')'
END AS TYP,
CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE,
CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK
FROM RDB$RELATION_FIELDS f
JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE
LEFT JOIN (
SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME
FROM RDB$INDICES i
JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME
WHERE i.RDB$RELATION_NAME = ?
AND EXISTS (
SELECT 1 FROM RDB$RELATION_CONSTRAINTS c
WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME
AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY'
)
) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME
WHERE TRIM(f.RDB$RELATION_NAME) = ?
ORDER BY f.RDB$FIELD_POSITION
""", [table_name.upper(), table_name.upper()])
columns = [
{'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]}
for r in cur.fetchall()
]
# Počet záznamů
try:
cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}')
count = cur.fetchone()[0]
except Exception:
count = None
return {
'tabulka': table_name.upper(),
'pocet_zaznamu': count,
'sloupce': columns
}
except Exception:
log(f"get_table_info chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def safe_query(sql: str, params: Optional[list] = None) -> dict:
"""Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách.
Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY
bez WHERE klauzule. Výsledek omezen na 500 řádků.
"""
try:
sql_upper = sql.strip().upper()
if not sql_upper.startswith('SELECT'):
return {'error': 'safe_query podporuje pouze SELECT dotazy'}
# Varování pro velké tabulky bez WHERE
warnings = []
for tbl in LARGE_TABLES:
if tbl in sql_upper and 'WHERE' not in sql_upper:
warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!')
# Přidej FIRST limit pokud chybí
if 'FIRST ' not in sql_upper:
sql = sql.strip()
if sql.upper().startswith('SELECT'):
sql = 'SELECT FIRST 500 ' + sql[6:].lstrip()
cur = conn.cursor()
if params:
cur.execute(sql, params)
else:
cur.execute(sql)
rows = rows_to_json(cur.fetchall(), cur.description or [])
result = {'rowcount': len(rows), 'rows': rows}
if warnings:
result['warnings'] = warnings
return result
except Exception:
log(f"safe_query chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_prescriptions(idpac: int, months: int = 6) -> dict:
"""Vrátí seznam předepsaných léků pacienta z receptů za posledních N měsíců (výchozí 6).
Výsledek: pacient (jmeno, prijmeni, rc) + seznam receptů (datum, lek, dsig).
"""
try:
cur = conn.cursor()
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
cur.execute("""
SELECT DATUM, LEK, DSIG
FROM RECEPT
WHERE IDPAC = ?
AND DATUM >= DATEADD(-? MONTH TO CURRENT_DATE)
ORDER BY DATUM DESC, ID DESC
""", [idpac, months])
import datetime
recepty = [
{
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
'lek': r[1],
'dsig': r[2] or '',
}
for r in cur.fetchall()
]
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'obdobi_mesice': months,
'pocet': len(recepty),
'recepty': recepty,
}
except Exception:
log(f"get_patient_prescriptions chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_dxa_files(idpac: int) -> dict:
"""Zjistí, zda má pacient fyzicky uložený DXA/denzitometrický nález v tabulce FILES.
Hledá v FILENAME výrazy: DXA, DENZIT, OSTEOPOR (bez rozlišení velikosti písmen).
Vrátí: pacient + seznam nalezených souborů (id, filename, datum, poznamka).
"""
try:
cur = conn.cursor()
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
cur.execute("""
SELECT ID, FILENAME, DATUM, POZNAMKA
FROM FILES
WHERE IDPAC = ?
AND UPPER(FILENAME) LIKE '%DXA%'
ORDER BY DATUM DESC, ID DESC
""", [idpac])
import datetime
soubory = [
{
'id': r[0],
'filename': r[1],
'datum': r[2].isoformat() if isinstance(r[2], datetime.date) else r[2],
'poznamka': r[3] or '',
}
for r in cur.fetchall()
]
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'ma_dxa': len(soubory) > 0,
'pocet': len(soubory),
'soubory': soubory,
}
except Exception:
log(f"get_patient_dxa_files chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_vaccinations(idpac: int) -> dict:
"""Vrátí všechna očkování pacienta z tabulky OCKZAZ.
Výsledek: pacient + seznam očkování (datum, latka, zkratka, nazev, davka, por, priste, poznamka).
Řazení: od nejnovějšího. Zrušené záznamy (ZRUSENO='T') jsou označeny.
"""
try:
cur = conn.cursor()
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
cur.execute("""
SELECT DATUM, LATKA, ZKRATKA, NAZEV, DAVKA, POR, PRISTE, POZNAMKA, ZRUSENO
FROM OCKZAZ
WHERE IDPAC = ?
ORDER BY DATUM DESC, ID DESC
""", [idpac])
import datetime
ockovani = [
{
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
'latka': r[1] or '',
'zkratka': r[2] or '',
'nazev': r[3] or '',
'davka': r[4] or '',
'poradi': r[5],
'priste': r[6].isoformat() if isinstance(r[6], datetime.date) else r[6],
'poznamka': r[7] or '',
'zruseno': r[8] == 'T',
}
for r in cur.fetchall()
]
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'pocet': len(ockovani),
'ockovani': ockovani,
}
except Exception:
log(f"get_patient_vaccinations chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_psa_lab(idpac: int) -> dict:
"""Vrátí laboratorní výsledky PSA pacienta: datum, hodnota, jednotka, referenční meze (dolní/horní).
Hledá všechny metody jejichž název obsahuje PSA (Total PSA, Free PSA, PSA celkový atd.).
Výsledek seřazen od nejnovějšího.
"""
try:
cur = conn.cursor()
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
cur.execute("""
SELECT
h.DATUM,
m.NAZEV,
d.VYSL,
j.JEDN,
s.NORMDOL,
s.NORMHOR
FROM LABVH h
JOIN LABVD d ON d.IDVH = h.IDVH
JOIN LABMETOD m ON m.IDMETOD = d.IDMETOD
LEFT JOIN LABJEDN j ON j.IDJEDN = d.IDJEDN
LEFT JOIN LABSKALY s ON s.IDSKALY = d.IDSKALY AND s.TYP = '4'
WHERE h.IDPACIENT = ?
AND UPPER(m.NAZEV) LIKE '%PSA%'
ORDER BY h.DATUM DESC, h.IDVH DESC
""", [idpac])
import datetime
vysledky = [
{
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
'metoda': r[1],
'hodnota': r[2],
'jednotka': r[3] or '',
'ref_dolni': r[4],
'ref_horni': r[5],
}
for r in cur.fetchall()
]
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'pocet': len(vysledky),
'vysledky': vysledky,
}
except Exception:
log(f"get_patient_psa_lab chyba: {traceback.format_exc()}")
raise
@mcp.tool()
def get_patient_sick_leaves(idpac: int) -> dict:
"""Vrátí přehled pracovních neschopností pacienta z tabulky NES.
Výsledek: pacient + seznam neschopností (od, do, diagnóza, stav, zaměstnavatel, počet dní, ECN číslo).
Stornované záznamy jsou označeny. Řazení od nejnovějšího.
"""
try:
cur = conn.cursor()
cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac])
pac = cur.fetchone()
if not pac:
return {'error': f'Pacient IDPAC={idpac} nenalezen'}
cur.execute("""
SELECT ZACNES, KONNES, DIAGNO, PRACNE, PRICINA, PODNIK, PROFES,
STORNO, ECN, DUVOD_UKONCENI
FROM NES
WHERE IDPAC = ?
ORDER BY ZACNES DESC, ID DESC
""", [idpac])
import datetime
def fmt_date(v):
return v.isoformat() if isinstance(v, datetime.date) else v
def pocet_dni(zacnes, konnes, pracne):
if not zacnes:
return None
konec = konnes if konnes else datetime.date.today()
if isinstance(zacnes, datetime.date) and isinstance(konec, datetime.date):
return (konec - zacnes).days + 1
return None
neschopenky = [
{
'od': fmt_date(r[0]),
'do': fmt_date(r[1]),
'diagno': (r[2] or '').strip(),
'stav': 'aktivní' if r[3] == 'A' else 'ukončená',
'pricina': r[4] or '',
'zamestnavatel': r[5] or '',
'profes': r[6] or '',
'storno': r[7] == 'T',
'ecn': r[8] or '',
'duvod_ukonceni': r[9] or '',
'pocet_dni': pocet_dni(r[0], r[1], r[3]),
}
for r in cur.fetchall()
]
aktivni = sum(1 for n in neschopenky if n['stav'] == 'aktivní' and not n['storno'])
return {
'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]},
'pocet_celkem': len(neschopenky),
'pocet_aktivnich': aktivni,
'neschopenky': neschopenky,
}
except Exception:
log(f"get_patient_sick_leaves chyba: {traceback.format_exc()}")
raise
PSA_KODY = ('01131', '01132', '01133', '81227', '81530', '81718', '81800', '93225')
@mcp.tool()
def get_psa_records(idpac: Optional[int] = None, rok: Optional[int] = None) -> dict:
"""Vrátí přehled vykázaných PSA výkonů z DOKLADD.
Sledované kódy: 0113101133, 81227, 81530, 81718, 81800, 93225.
idpac: filtr konkrétního pacienta (None = všichni).
rok: filtr roku (None = vše).
Výsledek: počet + seznam (datum, kód, název, příjmení, jméno, rc).
"""
try:
cur = conn.cursor()
kody_placeholder = ','.join(['?' for _ in PSA_KODY])
params = list(PSA_KODY)
sql = f"""
SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS
FROM DOKLADD d
JOIN KAR k ON k.RODCIS = d.RODCIS
JOIN VYKONY v ON v.KOD = d.KOD
AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE)
WHERE d.KOD IN ({kody_placeholder})
"""
if idpac:
sql += " AND k.IDPAC = ?"
params.append(idpac)
if rok:
sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?"
params.append(rok)
sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI"
cur.execute(sql, params)
import datetime
zaznamy = [
{
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
'kod': r[1],
'naz': r[2],
'prijmeni': r[3],
'jmeno': r[4],
'rc': r[5].strip() if r[5] else '',
}
for r in cur.fetchall()
]
return {
'idpac': idpac,
'rok': rok,
'pocet': len(zaznamy),
'zaznamy': zaznamy,
}
except Exception:
log(f"get_psa_records chyba: {traceback.format_exc()}")
raise
DXA_KODY = ('10035', '11320', '11321', '11322', '11323', '11324', '11325', '11326')
@mcp.tool()
def get_dxa_records(rok: Optional[int] = None) -> dict:
"""Vrátí přehled vykázaných DXA výkonů (osteoporóza, denzitometrie).
Sledované kódy: 11320, 11321, 1132211326, 10035.
rok: filtr roku (např. 2025), None = vše.
Výsledek: celkový počet + seznam záznamů (datum, kód, název kódu, příjmení, jméno, rc).
"""
try:
cur = conn.cursor()
kody_placeholder = ','.join(['?' for _ in DXA_KODY])
params = list(DXA_KODY)
sql = f"""
SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS
FROM DOKLADD d
JOIN KAR k ON k.RODCIS = d.RODCIS
JOIN VYKONY v ON v.KOD = d.KOD
AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE)
WHERE d.KOD IN ({kody_placeholder})
"""
if rok:
sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?"
params.append(rok)
sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI"
cur.execute(sql, params)
import datetime
zaznamy = [
{
'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0],
'kod': r[1],
'naz': r[2],
'prijmeni': r[3],
'jmeno': r[4],
'rc': r[5].strip() if r[5] else '',
}
for r in cur.fetchall()
]
return {
'rok': rok,
'pocet': len(zaznamy),
'zaznamy': zaznamy,
}
except Exception:
log(f"get_dxa_records chyba: {traceback.format_exc()}")
raise
if __name__ == '__main__':
log("MCP Firebird server spuštěn (FastMCP)")
mcp.run()