#!/usr/bin/env python3 """ MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP) Spustit: python mcp_firebird.py """ import sys import os import traceback from pathlib import Path from typing import Optional from mcp.server.fastmcp import FastMCP sys.path.insert(0, str(Path(__file__).resolve().parent)) from Knihovny.medicus_db import get_medicus_connection # Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC def log(msg: str): print(msg, file=sys.stderr, flush=True) # Připojení k Firebirdu try: conn = get_medicus_connection() log("✓ Připojeno k Firebirdu (Medicus)") except Exception as e: log(f"✗ Chyba připojení k Firebirdu: {e}") sys.exit(1) def rows_to_json(rows, description): """Převede fdb rows na JSON-serializovatelný formát""" import datetime import decimal def convert(val): if isinstance(val, (datetime.date, datetime.datetime)): return val.isoformat() if isinstance(val, decimal.Decimal): return float(val) if isinstance(val, bytes): return val.decode('win1250', errors='replace') return val cols = [d[0].strip() for d in description] return [dict(zip(cols, [convert(v) for v in row])) for row in rows] # MCP server mcp = FastMCP("medicus-firebird") @mcp.tool() def execute_query(sql: str, params: Optional[list] = None) -> dict: """Spusť SQL dotaz na Medicus databázi. Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount. """ try: cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) if sql.strip().upper().startswith('SELECT'): rows = rows_to_json(cur.fetchall(), cur.description or []) return { 'rowcount': len(rows), 'rows': rows } else: conn.commit() return { 'rowcount': cur.rowcount, 'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno' } except Exception as e: log(f"execute_query chyba: {traceback.format_exc()}") raise @mcp.tool() def list_tables() -> list[str]: """Vrátí seznam všech uživatelských tabulek v Medicus databázi.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$RELATION_NAME) FROM RDB$RELATIONS WHERE RDB$SYSTEM_FLAG = 0 ORDER BY RDB$RELATION_NAME """) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"list_tables chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_columns(table_name: str) -> list[str]: """Vrátí seznam sloupců dané tabulky.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$FIELD_NAME) FROM RDB$RELATION_FIELDS WHERE TRIM(RDB$RELATION_NAME) = ? ORDER BY RDB$FIELD_POSITION """, [table_name.upper()]) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"get_table_columns chyba: {traceback.format_exc()}") raise @mcp.tool() def get_schema() -> dict: """Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce.""" try: cur = conn.cursor() cur.execute(""" SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME FROM RDB$RELATIONS r LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME WHERE r.RDB$SYSTEM_FLAG = 0 ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION """) schema = {} for tbl_name, col_name in cur.fetchall(): tbl = tbl_name.strip() if tbl_name else 'unknown' col = col_name.strip() if col_name else '' if tbl not in schema: schema[tbl] = [] if col: schema[tbl].append(col) return { 'table_count': len(schema), 'tables': list(schema.keys()), 'schema': schema } except Exception as e: log(f"get_schema chyba: {traceback.format_exc()}") raise # ── Velké tabulky kde je nutný WHERE filtr ────────────────────────────────── LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'} def _parse_histdoc_blob(data_text: str) -> dict: """Interní parser key=value blobu z HISTDOC.DATA.""" result = {} if not data_text: return result for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'): line = line.strip() if '=' not in line: continue key, _, value = line.partition('=') key = key.strip() value = value.strip() if value.startswith('C:'): try: result[key] = float(value[2:].replace(',', '.')) except Exception: result[key] = value[2:] elif value.startswith('I:'): try: result[key] = int(value[2:]) except Exception: result[key] = value[2:] elif value.startswith('D:'): result[key] = value[2:] elif value in ('$:~\b', '$:~\x08', '$:'): result[key] = '' else: result[key] = value return result @mcp.tool() def get_patient(idpac: int) -> dict: """Vrátí základní info o pacientovi z tabulky KAR podle IDPAC. Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen. """ try: cur = conn.cursor() cur.execute(""" SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN FROM KAR WHERE IDPAC = ? """, [idpac]) row = cur.fetchone() if not row: return {'error': f'Pacient IDPAC={idpac} nenalezen'} import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() if isinstance(v, decimal.Decimal): return float(v) return v cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen'] return dict(zip(cols, [cv(v) for v in row])) except Exception: log(f"get_patient chyba: {traceback.format_exc()}") raise @mcp.tool() def search_patients(query: str) -> list: """Vyhledá pacienty podle příjmení, jména nebo rodného čísla (částečná shoda). Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, pojistovna. """ try: cur = conn.cursor() q = query.strip().upper() cur.execute(""" SELECT FIRST 50 IDPAC, JMENO, PRIJMENI, RODCIS, POJ FROM KAR WHERE UPPER(PRIJMENI) LIKE ? OR UPPER(JMENO) LIKE ? OR RODCIS LIKE ? ORDER BY PRIJMENI, JMENO """, [f'%{q}%', f'%{q}%', f'%{q}%']) rows = cur.fetchall() return [ {'idpac': r[0], 'jmeno': r[1], 'prijmeni': r[2], 'rc': r[3], 'pojistovna': r[4]} for r in rows ] except Exception: log(f"search_patients chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict: """Chronologický přehled všech záznamů pacienta z DOCLIST. datum_od / datum_do ve formátu YYYY-MM-DD (volitelné). Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis). """ try: cur = conn.cursor() # Info o pacientovi cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} # Timeline z DOCLIST sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?" params = [idpac] if datum_od: sql += " AND DATUM >= ?" params.append(datum_od) if datum_do: sql += " AND DATUM <= ?" params.append(datum_do) sql += " ORDER BY DATUM, ID" cur.execute(sql, params) import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v events = [ {'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''} for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(events), 'events': events } except Exception: log(f"get_patient_timeline chyba: {traceback.format_exc()}") raise @mcp.tool() def parse_histdoc_data(idhistdoc: int) -> dict: """Načte a dekóduje DATA blob z HISTDOC pro daný záznam. Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad… """ try: cur = conn.cursor() cur.execute(""" SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)), e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA FROM HISTDOC h LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID WHERE h.ID = ? """, [idhistdoc]) row = cur.fetchone() if not row: return {'error': f'HISTDOC ID={idhistdoc} nenalezen'} import datetime def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row parsed = _parse_histdoc_blob(data_text or '') return { 'id': idhistdoc, 'typ': typ, 'datum': cv(datum), 'data': parsed, 'epoukaz': { 'id_dokladu': id_dokladu, 'id_zp': id_zp, 'odeslano': cv(odeslano), 'vydano': cv(vydano), 'chyba': chyba == 'T' if chyba else False, } } except Exception: log(f"parse_histdoc_data chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_info(table_name: str) -> dict: """Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(f.RDB$FIELD_NAME) AS SLOUPEC, CASE ft.RDB$FIELD_TYPE WHEN 7 THEN 'SMALLINT' WHEN 8 THEN 'INTEGER' WHEN 10 THEN 'FLOAT' WHEN 12 THEN 'DATE' WHEN 13 THEN 'TIME' WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 16 THEN 'BIGINT' WHEN 27 THEN 'DOUBLE' WHEN 35 THEN 'TIMESTAMP' WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 261 THEN 'BLOB' ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')' END AS TYP, CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE, CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK FROM RDB$RELATION_FIELDS f JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE LEFT JOIN ( SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME FROM RDB$INDICES i JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME WHERE i.RDB$RELATION_NAME = ? AND EXISTS ( SELECT 1 FROM RDB$RELATION_CONSTRAINTS c WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY' ) ) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME WHERE TRIM(f.RDB$RELATION_NAME) = ? ORDER BY f.RDB$FIELD_POSITION """, [table_name.upper(), table_name.upper()]) columns = [ {'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]} for r in cur.fetchall() ] # Počet záznamů try: cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}') count = cur.fetchone()[0] except Exception: count = None return { 'tabulka': table_name.upper(), 'pocet_zaznamu': count, 'sloupce': columns } except Exception: log(f"get_table_info chyba: {traceback.format_exc()}") raise @mcp.tool() def safe_query(sql: str, params: Optional[list] = None) -> dict: """Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách. Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY bez WHERE klauzule. Výsledek omezen na 500 řádků. """ try: sql_upper = sql.strip().upper() if not sql_upper.startswith('SELECT'): return {'error': 'safe_query podporuje pouze SELECT dotazy'} # Varování pro velké tabulky bez WHERE warnings = [] for tbl in LARGE_TABLES: if tbl in sql_upper and 'WHERE' not in sql_upper: warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!') # Přidej FIRST limit pokud chybí if 'FIRST ' not in sql_upper: sql = sql.strip() if sql.upper().startswith('SELECT'): sql = 'SELECT FIRST 500 ' + sql[6:].lstrip() cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) rows = rows_to_json(cur.fetchall(), cur.description or []) result = {'rowcount': len(rows), 'rows': rows} if warnings: result['warnings'] = warnings return result except Exception: log(f"safe_query chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_prescriptions(idpac: int, months: int = 6) -> dict: """Vrátí seznam předepsaných léků pacienta z receptů za posledních N měsíců (výchozí 6). Výsledek: pacient (jmeno, prijmeni, rc) + seznam receptů (datum, lek, dsig). """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT DATUM, LEK, DSIG FROM RECEPT WHERE IDPAC = ? AND DATUM >= DATEADD(-? MONTH TO CURRENT_DATE) ORDER BY DATUM DESC, ID DESC """, [idpac, months]) import datetime recepty = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'lek': r[1], 'dsig': r[2] or '', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'obdobi_mesice': months, 'pocet': len(recepty), 'recepty': recepty, } except Exception: log(f"get_patient_prescriptions chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_dxa_files(idpac: int) -> dict: """Zjistí, zda má pacient fyzicky uložený DXA/denzitometrický nález v tabulce FILES. Hledá v FILENAME výrazy: DXA, DENZIT, OSTEOPOR (bez rozlišení velikosti písmen). Vrátí: pacient + seznam nalezených souborů (id, filename, datum, poznamka). """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT ID, FILENAME, DATUM, POZNAMKA FROM FILES WHERE IDPAC = ? AND UPPER(FILENAME) LIKE '%DXA%' ORDER BY DATUM DESC, ID DESC """, [idpac]) import datetime soubory = [ { 'id': r[0], 'filename': r[1], 'datum': r[2].isoformat() if isinstance(r[2], datetime.date) else r[2], 'poznamka': r[3] or '', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'ma_dxa': len(soubory) > 0, 'pocet': len(soubory), 'soubory': soubory, } except Exception: log(f"get_patient_dxa_files chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_vaccinations(idpac: int) -> dict: """Vrátí všechna očkování pacienta z tabulky OCKZAZ. Výsledek: pacient + seznam očkování (datum, latka, zkratka, nazev, davka, por, priste, poznamka). Řazení: od nejnovějšího. Zrušené záznamy (ZRUSENO='T') jsou označeny. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT DATUM, LATKA, ZKRATKA, NAZEV, DAVKA, POR, PRISTE, POZNAMKA, ZRUSENO FROM OCKZAZ WHERE IDPAC = ? ORDER BY DATUM DESC, ID DESC """, [idpac]) import datetime ockovani = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'latka': r[1] or '', 'zkratka': r[2] or '', 'nazev': r[3] or '', 'davka': r[4] or '', 'poradi': r[5], 'priste': r[6].isoformat() if isinstance(r[6], datetime.date) else r[6], 'poznamka': r[7] or '', 'zruseno': r[8] == 'T', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(ockovani), 'ockovani': ockovani, } except Exception: log(f"get_patient_vaccinations chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_psa_lab(idpac: int) -> dict: """Vrátí laboratorní výsledky PSA pacienta: datum, hodnota, jednotka, referenční meze (dolní/horní). Hledá všechny metody jejichž název obsahuje PSA (Total PSA, Free PSA, PSA celkový atd.). Výsledek seřazen od nejnovějšího. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT h.DATUM, m.NAZEV, d.VYSL, j.JEDN, s.NORMDOL, s.NORMHOR FROM LABVH h JOIN LABVD d ON d.IDVH = h.IDVH JOIN LABMETOD m ON m.IDMETOD = d.IDMETOD LEFT JOIN LABJEDN j ON j.IDJEDN = d.IDJEDN LEFT JOIN LABSKALY s ON s.IDSKALY = d.IDSKALY AND s.TYP = '4' WHERE h.IDPACIENT = ? AND UPPER(m.NAZEV) LIKE '%PSA%' ORDER BY h.DATUM DESC, h.IDVH DESC """, [idpac]) import datetime vysledky = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'metoda': r[1], 'hodnota': r[2], 'jednotka': r[3] or '', 'ref_dolni': r[4], 'ref_horni': r[5], } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(vysledky), 'vysledky': vysledky, } except Exception: log(f"get_patient_psa_lab chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_sick_leaves(idpac: int) -> dict: """Vrátí přehled pracovních neschopností pacienta z tabulky NES. Výsledek: pacient + seznam neschopností (od, do, diagnóza, stav, zaměstnavatel, počet dní, ECN číslo). Stornované záznamy jsou označeny. Řazení od nejnovějšího. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT ZACNES, KONNES, DIAGNO, PRACNE, PRICINA, PODNIK, PROFES, STORNO, ECN, DUVOD_UKONCENI FROM NES WHERE IDPAC = ? ORDER BY ZACNES DESC, ID DESC """, [idpac]) import datetime def fmt_date(v): return v.isoformat() if isinstance(v, datetime.date) else v def pocet_dni(zacnes, konnes, pracne): if not zacnes: return None konec = konnes if konnes else datetime.date.today() if isinstance(zacnes, datetime.date) and isinstance(konec, datetime.date): return (konec - zacnes).days + 1 return None neschopenky = [ { 'od': fmt_date(r[0]), 'do': fmt_date(r[1]), 'diagno': (r[2] or '').strip(), 'stav': 'aktivní' if r[3] == 'A' else 'ukončená', 'pricina': r[4] or '', 'zamestnavatel': r[5] or '', 'profes': r[6] or '', 'storno': r[7] == 'T', 'ecn': r[8] or '', 'duvod_ukonceni': r[9] or '', 'pocet_dni': pocet_dni(r[0], r[1], r[3]), } for r in cur.fetchall() ] aktivni = sum(1 for n in neschopenky if n['stav'] == 'aktivní' and not n['storno']) return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet_celkem': len(neschopenky), 'pocet_aktivnich': aktivni, 'neschopenky': neschopenky, } except Exception: log(f"get_patient_sick_leaves chyba: {traceback.format_exc()}") raise PSA_KODY = ('01131', '01132', '01133', '81227', '81530', '81718', '81800', '93225') @mcp.tool() def get_psa_records(idpac: Optional[int] = None, rok: Optional[int] = None) -> dict: """Vrátí přehled vykázaných PSA výkonů z DOKLADD. Sledované kódy: 01131–01133, 81227, 81530, 81718, 81800, 93225. idpac: filtr konkrétního pacienta (None = všichni). rok: filtr roku (None = vše). Výsledek: počet + seznam (datum, kód, název, příjmení, jméno, rc). """ try: cur = conn.cursor() kody_placeholder = ','.join(['?' for _ in PSA_KODY]) params = list(PSA_KODY) sql = f""" SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS FROM DOKLADD d JOIN KAR k ON k.RODCIS = d.RODCIS JOIN VYKONY v ON v.KOD = d.KOD AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE) WHERE d.KOD IN ({kody_placeholder}) """ if idpac: sql += " AND k.IDPAC = ?" params.append(idpac) if rok: sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?" params.append(rok) sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI" cur.execute(sql, params) import datetime zaznamy = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'kod': r[1], 'naz': r[2], 'prijmeni': r[3], 'jmeno': r[4], 'rc': r[5].strip() if r[5] else '', } for r in cur.fetchall() ] return { 'idpac': idpac, 'rok': rok, 'pocet': len(zaznamy), 'zaznamy': zaznamy, } except Exception: log(f"get_psa_records chyba: {traceback.format_exc()}") raise DXA_KODY = ('10035', '11320', '11321', '11322', '11323', '11324', '11325', '11326') @mcp.tool() def get_dxa_records(rok: Optional[int] = None) -> dict: """Vrátí přehled vykázaných DXA výkonů (osteoporóza, denzitometrie). Sledované kódy: 11320, 11321, 11322–11326, 10035. rok: filtr roku (např. 2025), None = vše. Výsledek: celkový počet + seznam záznamů (datum, kód, název kódu, příjmení, jméno, rc). """ try: cur = conn.cursor() kody_placeholder = ','.join(['?' for _ in DXA_KODY]) params = list(DXA_KODY) sql = f""" SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS FROM DOKLADD d JOIN KAR k ON k.RODCIS = d.RODCIS JOIN VYKONY v ON v.KOD = d.KOD AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE) WHERE d.KOD IN ({kody_placeholder}) """ if rok: sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?" params.append(rok) sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI" cur.execute(sql, params) import datetime zaznamy = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'kod': r[1], 'naz': r[2], 'prijmeni': r[3], 'jmeno': r[4], 'rc': r[5].strip() if r[5] else '', } for r in cur.fetchall() ] return { 'rok': rok, 'pocet': len(zaznamy), 'zaznamy': zaznamy, } except Exception: log(f"get_dxa_records chyba: {traceback.format_exc()}") raise if __name__ == '__main__': log("MCP Firebird server spuštěn (FastMCP)") mcp.run()