#!/usr/bin/env python3 """ MCP server pro Firebird/Medicus — používá oficiální MCP SDK (FastMCP) Spustit: python mcp_firebird.py """ import sys import os import traceback from pathlib import Path from typing import Optional from mcp.server.fastmcp import FastMCP sys.path.insert(0, str(Path(__file__).resolve().parent)) from Knihovny.medicus_db import get_medicus_connection # Všechny logy MUSÍ jít na stderr — stdout je rezervován pro JSON-RPC def log(msg: str): print(msg, file=sys.stderr, flush=True) # Připojení k Firebirdu try: conn = get_medicus_connection() log("✓ Připojeno k Firebirdu (Medicus)") except Exception as e: log(f"✗ Chyba připojení k Firebirdu: {e}") sys.exit(1) def rows_to_json(rows, description): """Převede fdb rows na JSON-serializovatelný formát""" import datetime import decimal def convert(val): if isinstance(val, (datetime.date, datetime.datetime)): return val.isoformat() if isinstance(val, decimal.Decimal): return float(val) if isinstance(val, bytes): return val.decode('win1250', errors='replace') return val cols = [d[0].strip() for d in description] return [dict(zip(cols, [convert(v) for v in row])) for row in rows] # MCP server mcp = FastMCP("medicus-firebird") @mcp.tool() def execute_query(sql: str, params: Optional[list] = None) -> dict: """Spusť SQL dotaz na Medicus databázi. Pro SELECT vrátí columns + rows. Pro INSERT/UPDATE/DELETE vrátí rowcount. """ try: cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) if sql.strip().upper().startswith('SELECT'): rows = rows_to_json(cur.fetchall(), cur.description or []) return { 'rowcount': len(rows), 'rows': rows } else: conn.commit() return { 'rowcount': cur.rowcount, 'message': f'Dotaz proveden: {cur.rowcount} řádků ovlivněno' } except Exception as e: log(f"execute_query chyba: {traceback.format_exc()}") raise @mcp.tool() def list_tables() -> list[str]: """Vrátí seznam všech uživatelských tabulek v Medicus databázi.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$RELATION_NAME) FROM RDB$RELATIONS WHERE RDB$SYSTEM_FLAG = 0 ORDER BY RDB$RELATION_NAME """) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"list_tables chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_columns(table_name: str) -> list[str]: """Vrátí seznam sloupců dané tabulky.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(RDB$FIELD_NAME) FROM RDB$RELATION_FIELDS WHERE TRIM(RDB$RELATION_NAME) = ? ORDER BY RDB$FIELD_POSITION """, [table_name.upper()]) return [row[0] for row in cur.fetchall()] except Exception as e: log(f"get_table_columns chyba: {traceback.format_exc()}") raise @mcp.tool() def get_schema() -> dict: """Vrátí kompletní schéma DB — všechny tabulky a jejich sloupce.""" try: cur = conn.cursor() cur.execute(""" SELECT r.RDB$RELATION_NAME, f.RDB$FIELD_NAME FROM RDB$RELATIONS r LEFT JOIN RDB$RELATION_FIELDS f ON r.RDB$RELATION_NAME = f.RDB$RELATION_NAME WHERE r.RDB$SYSTEM_FLAG = 0 ORDER BY r.RDB$RELATION_NAME, f.RDB$FIELD_POSITION """) schema = {} for tbl_name, col_name in cur.fetchall(): tbl = tbl_name.strip() if tbl_name else 'unknown' col = col_name.strip() if col_name else '' if tbl not in schema: schema[tbl] = [] if col: schema[tbl].append(col) return { 'table_count': len(schema), 'tables': list(schema.keys()), 'schema': schema } except Exception as e: log(f"get_schema chyba: {traceback.format_exc()}") raise # ── Velké tabulky kde je nutný WHERE filtr ────────────────────────────────── LARGE_TABLES = {'LOG', 'ZURNAL', 'LABVD', 'DOCLIST', 'PZT', 'LEKY', 'DEKLINK'} def _parse_histdoc_blob(data_text: str) -> dict: """Interní parser key=value blobu z HISTDOC.DATA.""" result = {} if not data_text: return result for line in data_text.replace('\r\n', '\n').replace('\r', '\n').split('\n'): line = line.strip() if '=' not in line: continue key, _, value = line.partition('=') key = key.strip() value = value.strip() if value.startswith('C:'): try: result[key] = float(value[2:].replace(',', '.')) except Exception: result[key] = value[2:] elif value.startswith('I:'): try: result[key] = int(value[2:]) except Exception: result[key] = value[2:] elif value.startswith('D:'): result[key] = value[2:] elif value in ('$:~\b', '$:~\x08', '$:'): result[key] = '' else: result[key] = value return result @mcp.tool() def get_patient(idpac: int) -> dict: """Vrátí základní info o pacientovi z tabulky KAR podle IDPAC. Výsledek: jmeno, prijmeni, rc, datnar, pojistovna, vyrazen. """ try: cur = conn.cursor() cur.execute(""" SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN FROM KAR WHERE IDPAC = ? """, [idpac]) row = cur.fetchone() if not row: return {'error': f'Pacient IDPAC={idpac} nenalezen'} import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() if isinstance(v, decimal.Decimal): return float(v) return v cols = ['idpac', 'jmeno', 'prijmeni', 'rc', 'datnar', 'pojistovna', 'vyrazen'] return dict(zip(cols, [cv(v) for v in row])) except Exception: log(f"get_patient chyba: {traceback.format_exc()}") raise def _strip_diacritics(s: str) -> str: """Bez diakritiky, velkými písmeny, sjednocené mezery.""" import re import unicodedata s = unicodedata.normalize('NFKD', s or '') s = ''.join(c for c in s if not unicodedata.combining(c)) return re.sub(r'\s+', ' ', s).strip().upper() @mcp.tool() def search_patients(query: str, datum_narozeni: Optional[str] = None) -> list: """Vyhledá pacienty podle jména/příjmení (bez ohledu na diakritiku a pořadí slov, např. "Mateju Petr" najde "Petr Matějů") nebo rodného čísla (částečná shoda, jen číslice). datum_narozeni: volitelný filtr YYYY-MM-DD. Vrátí max. 50 výsledků: idpac, jmeno, prijmeni, rc, datnar, pojistovna, vyrazen. """ try: import datetime import re cur = conn.cursor() cur.execute(""" SELECT IDPAC, JMENO, PRIJMENI, RODCIS, DATNAR, POJ, VYRAZEN FROM KAR WHERE PRIJMENI IS NOT NULL AND PRIJMENI <> '' """) q_digits = re.sub(r'\D', '', query) q_tokens = _strip_diacritics(query).split() results = [] for r in cur.fetchall(): idpac, jmeno, prijmeni, rc, datnar, poj, vyrazen = r datnar_iso = datnar.isoformat() if isinstance(datnar, datetime.date) else datnar if datum_narozeni and str(datnar_iso or '')[:10] != datum_narozeni: continue if q_digits and len(q_digits) >= 4: if q_digits not in (rc or ''): continue elif q_tokens: name_norm = _strip_diacritics(f"{jmeno or ''} {prijmeni or ''}") if not all(t in name_norm for t in q_tokens): continue elif not datum_narozeni: continue # prázdný dotaz bez filtru data — nevracet celou kartotéku results.append({ 'idpac': idpac, 'jmeno': jmeno, 'prijmeni': prijmeni, 'rc': rc, 'datnar': datnar_iso, 'pojistovna': poj, 'vyrazen': vyrazen == 'A', }) results.sort(key=lambda p: (p['prijmeni'] or '', p['jmeno'] or '')) return results[:50] except Exception: log(f"search_patients chyba: {traceback.format_exc()}") raise @mcp.tool() def search_patient_by_contact(kontakt: str) -> list: """Vyhledá pacienty podle kontaktu (e-mail nebo telefon) v tabulce KARKONTAKT. Částečná shoda bez ohledu na velikost písmen; u telefonů se porovnávají jen číslice (ignoruje mezery a +420). Vrátí max. 50 výsledků: pacient (idpac, jmeno, prijmeni, rc, datnar, pojistovna, vyrazen) + kontakt (typ, popis, vztah). """ try: import datetime import re cur = conn.cursor() cur.execute(""" SELECT kk.IDPAC, kk.KONTAKT, kk.TYP, kk.POPIS, kk.VZTAH, k.JMENO, k.PRIJMENI, k.RODCIS, k.DATNAR, k.POJ, k.VYRAZEN FROM KARKONTAKT kk JOIN KAR k ON k.IDPAC = kk.IDPAC WHERE kk.KONTAKT IS NOT NULL AND kk.KONTAKT <> '' """) q = kontakt.strip().lower() q_digits = re.sub(r'\D', '', kontakt) if q_digits.startswith('420'): q_digits = q_digits[3:] results = [] for r in cur.fetchall(): (idpac, kont, typ, popis, vztah, jmeno, prijmeni, rc, datnar, poj, vyrazen) = r kont = (kont or '').strip() hit = q and q in kont.lower() if not hit and len(q_digits) >= 6: kont_digits = re.sub(r'\D', '', kont) if kont_digits.startswith('420'): kont_digits = kont_digits[3:] hit = q_digits in kont_digits if not hit: continue results.append({ 'idpac': idpac, 'jmeno': jmeno, 'prijmeni': prijmeni, 'rc': rc, 'datnar': datnar.isoformat() if isinstance(datnar, datetime.date) else datnar, 'pojistovna': poj, 'vyrazen': vyrazen == 'A', 'kontakt': kont, 'typ': typ, 'popis': popis or '', 'vztah': vztah or '', }) return results[:50] except Exception: log(f"search_patient_by_contact chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_timeline(idpac: int, datum_od: Optional[str] = None, datum_do: Optional[str] = None) -> dict: """Chronologický přehled všech záznamů pacienta z DOCLIST. datum_od / datum_do ve formátu YYYY-MM-DD (volitelné). Vrátí: pacient (jmeno, prijmeni, rc) + seznam událostí (datum, tabulka, popis). """ try: cur = conn.cursor() # Info o pacientovi cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} # Timeline z DOCLIST sql = "SELECT DATUM, TABULKA, TYP, POPIS FROM DOCLIST WHERE IDPAC = ?" params = [idpac] if datum_od: sql += " AND DATUM >= ?" params.append(datum_od) if datum_do: sql += " AND DATUM <= ?" params.append(datum_do) sql += " ORDER BY DATUM, ID" cur.execute(sql, params) import datetime, decimal def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v events = [ {'datum': cv(r[0]), 'tabulka': r[1], 'typ': r[2], 'popis': r[3] or ''} for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(events), 'events': events } except Exception: log(f"get_patient_timeline chyba: {traceback.format_exc()}") raise @mcp.tool() def parse_histdoc_data(idhistdoc: int) -> dict: """Načte a dekóduje DATA blob z HISTDOC pro daný záznam. Vrátí strukturovaný dict: Kod, Nazev, Pocet, Cena, Poj, Dgn, DatPlat, Stav, Doklad… """ try: cur = conn.cursor() cur.execute(""" SELECT h.TYP, h.DATUM, CAST(h.DATA AS VARCHAR(8000)), e.ID_DOKLADU, e.ID_ZP, e.ODESLANO, e.VYDANO, e.CHYBA FROM HISTDOC h LEFT JOIN HISTDOC_EPOUKAZ e ON e.IDHISTDOC = h.ID WHERE h.ID = ? """, [idhistdoc]) row = cur.fetchone() if not row: return {'error': f'HISTDOC ID={idhistdoc} nenalezen'} import datetime def cv(v): if isinstance(v, (datetime.date, datetime.datetime)): return v.isoformat() return v typ, datum, data_text, id_dokladu, id_zp, odeslano, vydano, chyba = row parsed = _parse_histdoc_blob(data_text or '') return { 'id': idhistdoc, 'typ': typ, 'datum': cv(datum), 'data': parsed, 'epoukaz': { 'id_dokladu': id_dokladu, 'id_zp': id_zp, 'odeslano': cv(odeslano), 'vydano': cv(vydano), 'chyba': chyba == 'T' if chyba else False, } } except Exception: log(f"parse_histdoc_data chyba: {traceback.format_exc()}") raise @mcp.tool() def get_table_info(table_name: str) -> dict: """Vrátí rozšířené info o tabulce: sloupce s datovými typy, nullable, PK + počet záznamů.""" try: cur = conn.cursor() cur.execute(""" SELECT TRIM(f.RDB$FIELD_NAME) AS SLOUPEC, CASE ft.RDB$FIELD_TYPE WHEN 7 THEN 'SMALLINT' WHEN 8 THEN 'INTEGER' WHEN 10 THEN 'FLOAT' WHEN 12 THEN 'DATE' WHEN 13 THEN 'TIME' WHEN 14 THEN 'CHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 16 THEN 'BIGINT' WHEN 27 THEN 'DOUBLE' WHEN 35 THEN 'TIMESTAMP' WHEN 37 THEN 'VARCHAR(' || ft.RDB$FIELD_LENGTH || ')' WHEN 261 THEN 'BLOB' ELSE 'TYP(' || ft.RDB$FIELD_TYPE || ')' END AS TYP, CASE COALESCE(f.RDB$NULL_FLAG, 0) WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE, CASE WHEN pk.RDB$FIELD_NAME IS NOT NULL THEN 'PK' ELSE '' END AS PK FROM RDB$RELATION_FIELDS f JOIN RDB$FIELDS ft ON ft.RDB$FIELD_NAME = f.RDB$FIELD_SOURCE LEFT JOIN ( SELECT i.RDB$RELATION_NAME, s.RDB$FIELD_NAME FROM RDB$INDICES i JOIN RDB$INDEX_SEGMENTS s ON s.RDB$INDEX_NAME = i.RDB$INDEX_NAME WHERE i.RDB$RELATION_NAME = ? AND EXISTS ( SELECT 1 FROM RDB$RELATION_CONSTRAINTS c WHERE c.RDB$INDEX_NAME = i.RDB$INDEX_NAME AND c.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY' ) ) pk ON pk.RDB$FIELD_NAME = f.RDB$FIELD_NAME WHERE TRIM(f.RDB$RELATION_NAME) = ? ORDER BY f.RDB$FIELD_POSITION """, [table_name.upper(), table_name.upper()]) columns = [ {'sloupec': r[0], 'typ': r[1], 'nullable': r[2], 'pk': r[3]} for r in cur.fetchall() ] # Počet záznamů try: cur.execute(f'SELECT COUNT(*) FROM {table_name.upper()}') count = cur.fetchone()[0] except Exception: count = None return { 'tabulka': table_name.upper(), 'pocet_zaznamu': count, 'sloupce': columns } except Exception: log(f"get_table_info chyba: {traceback.format_exc()}") raise @mcp.tool() def get_columns_overview(table_name: str, sample_rows: int = 1000) -> dict: """Přehled obsahu sloupců tabulky pro pochopení sémantiky (např. co znamená KARKONTAKT.TYP=3 nebo RECEPT.STORNO='T'). Ze vzorku posledních N řádků (výchozí 1000) vrátí pro každý sloupec: počet vyplněných, počet distinct hodnot a top 5 nejčastějších hodnot s četností. Hodnoty zkráceny na 80 znaků. """ try: from collections import Counter cur = conn.cursor() cur.execute(f'SELECT FIRST {int(sample_rows)} * FROM {table_name.upper()}') rows = rows_to_json(cur.fetchall(), cur.description or []) if not rows: return {'tabulka': table_name.upper(), 'sample': 0, 'sloupce': {}} overview = {} for col in rows[0].keys(): values = [r[col] for r in rows if r[col] is not None and r[col] != ''] counter = Counter( str(v)[:80] for v in values ) overview[col] = { 'vyplneno': len(values), 'distinct': len(counter), 'top': [ {'hodnota': v, 'pocet': n} for v, n in counter.most_common(5) ], } result = { 'tabulka': table_name.upper(), 'sample': len(rows), 'sloupce': overview, } if table_name.upper() in LARGE_TABLES: result['warning'] = ( f'Tabulka {table_name.upper()} je velká — přehled je jen ' f'ze vzorku prvních {len(rows)} řádků.' ) return result except Exception: log(f"get_columns_overview chyba: {traceback.format_exc()}") raise @mcp.tool() def safe_query(sql: str, params: Optional[list] = None) -> dict: """Bezpečný SELECT s ochranou před timeoutem na velkých tabulkách. Automaticky varuje pokud dotaz míří na LOG, ZURNAL, LABVD, DOCLIST, PZT, LEKY bez WHERE klauzule. Výsledek omezen na 500 řádků. """ try: sql_upper = sql.strip().upper() if not sql_upper.startswith('SELECT'): return {'error': 'safe_query podporuje pouze SELECT dotazy'} # Varování pro velké tabulky bez WHERE warnings = [] for tbl in LARGE_TABLES: if tbl in sql_upper and 'WHERE' not in sql_upper: warnings.append(f'Tabulka {tbl} má miliony záznamů — přidej WHERE filtr!') # Přidej FIRST limit pokud chybí if 'FIRST ' not in sql_upper: sql = sql.strip() if sql.upper().startswith('SELECT'): sql = 'SELECT FIRST 500 ' + sql[6:].lstrip() cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) rows = rows_to_json(cur.fetchall(), cur.description or []) result = {'rowcount': len(rows), 'rows': rows} if warnings: result['warnings'] = warnings return result except Exception: log(f"safe_query chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_prescriptions(idpac: int, months: int = 6) -> dict: """Vrátí seznam předepsaných léků pacienta z receptů za posledních N měsíců (výchozí 6). Výsledek: pacient (jmeno, prijmeni, rc) + seznam receptů (datum, lek, dsig). """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT DATUM, LEK, DSIG, TEXT FROM RECEPT WHERE IDPAC = ? AND DATUM >= DATEADD(? MONTH TO CURRENT_DATE) ORDER BY DATUM DESC, ID DESC """, [idpac, -months]) import datetime def parse_baleni(text): """Z TEXT sloupce vytáhne druhý řádek s info o balení (dávka, forma, počet).""" if not text: return '' parts = text.replace('\r\n', '\n').split('\n') return parts[1].strip() if len(parts) > 1 else '' recepty = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'lek': r[1], 'baleni': parse_baleni(r[3]), 'dsig': r[2] or '', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'obdobi_mesice': months, 'pocet': len(recepty), 'recepty': recepty, } except Exception: log(f"get_patient_prescriptions chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_dxa_files(idpac: int) -> dict: """Zjistí, zda má pacient fyzicky uložený DXA/denzitometrický nález v tabulce FILES. Hledá v FILENAME výrazy: DXA, DENZIT, OSTEOPOR (bez rozlišení velikosti písmen). Vrátí: pacient + seznam nalezených souborů (id, filename, datum, poznamka). """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT ID, FILENAME, DATUM, POZNAMKA FROM FILES WHERE IDPAC = ? AND UPPER(FILENAME) LIKE '%DXA%' ORDER BY DATUM DESC, ID DESC """, [idpac]) import datetime soubory = [ { 'id': r[0], 'filename': r[1], 'datum': r[2].isoformat() if isinstance(r[2], datetime.date) else r[2], 'poznamka': r[3] or '', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'ma_dxa': len(soubory) > 0, 'pocet': len(soubory), 'soubory': soubory, } except Exception: log(f"get_patient_dxa_files chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_vaccinations(idpac: int) -> dict: """Vrátí všechna očkování pacienta z tabulky OCKZAZ. Výsledek: pacient + seznam očkování (datum, latka, zkratka, nazev, davka, por, priste, poznamka). Řazení: od nejnovějšího. Zrušené záznamy (ZRUSENO='T') jsou označeny. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT DATUM, LATKA, ZKRATKA, NAZEV, DAVKA, POR, PRISTE, POZNAMKA, ZRUSENO FROM OCKZAZ WHERE IDPAC = ? ORDER BY DATUM DESC, ID DESC """, [idpac]) import datetime ockovani = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'latka': r[1] or '', 'zkratka': r[2] or '', 'nazev': r[3] or '', 'davka': r[4] or '', 'poradi': r[5], 'priste': r[6].isoformat() if isinstance(r[6], datetime.date) else r[6], 'poznamka': r[7] or '', 'zruseno': r[8] == 'T', } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(ockovani), 'ockovani': ockovani, } except Exception: log(f"get_patient_vaccinations chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_psa_lab(idpac: int) -> dict: """Vrátí laboratorní výsledky PSA pacienta: datum, hodnota, jednotka, referenční meze (dolní/horní). Hledá všechny metody jejichž název obsahuje PSA (Total PSA, Free PSA, PSA celkový atd.). Výsledek seřazen od nejnovějšího. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT h.DATUM, m.NAZEV, d.VYSL, j.JEDN, s.NORMDOL, s.NORMHOR FROM LABVH h JOIN LABVD d ON d.IDVH = h.IDVH JOIN LABMETOD m ON m.IDMETOD = d.IDMETOD LEFT JOIN LABJEDN j ON j.IDJEDN = d.IDJEDN LEFT JOIN LABSKALY s ON s.IDSKALY = d.IDSKALY AND s.TYP = '4' WHERE h.IDPACIENT = ? AND UPPER(m.NAZEV) LIKE '%PSA%' ORDER BY h.DATUM DESC, h.IDVH DESC """, [idpac]) import datetime vysledky = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'metoda': r[1], 'hodnota': r[2], 'jednotka': r[3] or '', 'ref_dolni': r[4], 'ref_horni': r[5], } for r in cur.fetchall() ] return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet': len(vysledky), 'vysledky': vysledky, } except Exception: log(f"get_patient_psa_lab chyba: {traceback.format_exc()}") raise @mcp.tool() def get_patient_sick_leaves(idpac: int) -> dict: """Vrátí přehled pracovních neschopností pacienta z tabulky NES. Výsledek: pacient + seznam neschopností (od, do, diagnóza, stav, zaměstnavatel, počet dní, ECN číslo). Stornované záznamy jsou označeny. Řazení od nejnovějšího. """ try: cur = conn.cursor() cur.execute("SELECT JMENO, PRIJMENI, RODCIS FROM KAR WHERE IDPAC = ?", [idpac]) pac = cur.fetchone() if not pac: return {'error': f'Pacient IDPAC={idpac} nenalezen'} cur.execute(""" SELECT ZACNES, KONNES, DIAGNO, PRACNE, PRICINA, PODNIK, PROFES, STORNO, ECN, DUVOD_UKONCENI FROM NES WHERE IDPAC = ? ORDER BY ZACNES DESC, ID DESC """, [idpac]) import datetime def fmt_date(v): return v.isoformat() if isinstance(v, datetime.date) else v def pocet_dni(zacnes, konnes, pracne): if not zacnes: return None konec = konnes if konnes else datetime.date.today() if isinstance(zacnes, datetime.date) and isinstance(konec, datetime.date): return (konec - zacnes).days + 1 return None neschopenky = [ { 'od': fmt_date(r[0]), 'do': fmt_date(r[1]), 'diagno': (r[2] or '').strip(), 'stav': 'aktivní' if r[3] == 'A' else 'ukončená', 'pricina': r[4] or '', 'zamestnavatel': r[5] or '', 'profes': r[6] or '', 'storno': r[7] == 'T', 'ecn': r[8] or '', 'duvod_ukonceni': r[9] or '', 'pocet_dni': pocet_dni(r[0], r[1], r[3]), } for r in cur.fetchall() ] aktivni = sum(1 for n in neschopenky if n['stav'] == 'aktivní' and not n['storno']) return { 'pacient': {'jmeno': pac[0], 'prijmeni': pac[1], 'rc': pac[2]}, 'pocet_celkem': len(neschopenky), 'pocet_aktivnich': aktivni, 'neschopenky': neschopenky, } except Exception: log(f"get_patient_sick_leaves chyba: {traceback.format_exc()}") raise PSA_KODY = ('01131', '01132', '01133', '81227', '81530', '81718', '81800', '93225') @mcp.tool() def get_psa_records(idpac: Optional[int] = None, rok: Optional[int] = None) -> dict: """Vrátí přehled vykázaných PSA výkonů z DOKLADD. Sledované kódy: 01131–01133, 81227, 81530, 81718, 81800, 93225. idpac: filtr konkrétního pacienta (None = všichni). rok: filtr roku (None = vše). Výsledek: počet + seznam (datum, kód, název, příjmení, jméno, rc). """ try: cur = conn.cursor() kody_placeholder = ','.join(['?' for _ in PSA_KODY]) params = list(PSA_KODY) sql = f""" SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS FROM DOKLADD d JOIN KAR k ON k.RODCIS = d.RODCIS JOIN VYKONY v ON v.KOD = d.KOD AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE) WHERE d.KOD IN ({kody_placeholder}) """ if idpac: sql += " AND k.IDPAC = ?" params.append(idpac) if rok: sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?" params.append(rok) sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI" cur.execute(sql, params) import datetime zaznamy = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'kod': r[1], 'naz': r[2], 'prijmeni': r[3], 'jmeno': r[4], 'rc': r[5].strip() if r[5] else '', } for r in cur.fetchall() ] return { 'idpac': idpac, 'rok': rok, 'pocet': len(zaznamy), 'zaznamy': zaznamy, } except Exception: log(f"get_psa_records chyba: {traceback.format_exc()}") raise DXA_KODY = ('10035', '11320', '11321', '11322', '11323', '11324', '11325', '11326') @mcp.tool() def get_dxa_records(rok: Optional[int] = None) -> dict: """Vrátí přehled vykázaných DXA výkonů (osteoporóza, denzitometrie). Sledované kódy: 11320, 11321, 11322–11326, 10035. rok: filtr roku (např. 2025), None = vše. Výsledek: celkový počet + seznam záznamů (datum, kód, název kódu, příjmení, jméno, rc). """ try: cur = conn.cursor() kody_placeholder = ','.join(['?' for _ in DXA_KODY]) params = list(DXA_KODY) sql = f""" SELECT d.DATOSE, d.KOD, v.NAZ, k.PRIJMENI, k.JMENO, d.RODCIS FROM DOKLADD d JOIN KAR k ON k.RODCIS = d.RODCIS JOIN VYKONY v ON v.KOD = d.KOD AND d.DATOSE BETWEEN v.PLATIOD AND COALESCE(v.PLATIDO, CURRENT_DATE) WHERE d.KOD IN ({kody_placeholder}) """ if rok: sql += " AND EXTRACT(YEAR FROM d.DATOSE) = ?" params.append(rok) sql += " ORDER BY d.DATOSE DESC, k.PRIJMENI" cur.execute(sql, params) import datetime zaznamy = [ { 'datum': r[0].isoformat() if isinstance(r[0], datetime.date) else r[0], 'kod': r[1], 'naz': r[2], 'prijmeni': r[3], 'jmeno': r[4], 'rc': r[5].strip() if r[5] else '', } for r in cur.fetchall() ] return { 'rok': rok, 'pocet': len(zaznamy), 'zaznamy': zaznamy, } except Exception: log(f"get_dxa_records chyba: {traceback.format_exc()}") raise if __name__ == '__main__': log("MCP Firebird server spuštěn (FastMCP)") mcp.run()