#!/usr/bin/env python3 """ Knowledgebase MCP server ======================== Persistentní paměť pro Claude konverzace a znalosti. Vyhledávání: - Full-text (tsvector, vždy dostupné) - Sémantické (Voyage AI embeddingy + Python cosine similarity reranking) - Hybridní kombinace obou Env proměnné: PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB VOYAGE_API_KEY — pro vektorové embeddingy (volitelné) """ import json import os import sys import traceback from datetime import datetime from typing import Any, Optional import psycopg from psycopg.rows import dict_row from mcp.server.fastmcp import FastMCP # ─── Config ────────────────────────────────────────────────────────────────── PG_HOST = os.getenv("PG_HOST", "192.168.1.76") PG_PORT = int(os.getenv("PG_PORT", "5432")) PG_USER = os.getenv("PG_USER", "vladimir.buzalka") PG_PASSWORD = os.getenv("PG_PASSWORD", "Vlado7309208104++") PG_DB = os.getenv("PG_DB", "knowledgebase") VOYAGE_API_KEY = os.getenv("VOYAGE_API_KEY", "") EMBED_MODEL = "voyage-3-lite" # 1024-dim, fast & cheap # ─── Logging ───────────────────────────────────────────────────────────────── def log(msg: str): print(f"[KB] {msg}", file=sys.stderr, flush=True) # ─── DB connection ─────────────────────────────────────────────────────────── _conn: Optional[psycopg.Connection] = None def get_conn() -> psycopg.Connection: global _conn if _conn is not None and not _conn.closed: try: _conn.execute("SELECT 1") return _conn except Exception: pass _conn = psycopg.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASSWORD, dbname=PG_DB, row_factory=dict_row, autocommit=False, ) log(f"Connected to {PG_DB}@{PG_HOST}") return _conn # ─── Embeddings ────────────────────────────────────────────────────────────── _voyage_client = None def get_embedding(text: str) -> Optional[list[float]]: """Return 1024-dim embedding via Voyage AI, or None if unavailable.""" global _voyage_client if not VOYAGE_API_KEY: return None try: if _voyage_client is None: import voyageai _voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY) result = _voyage_client.embed([text[:8000]], model=EMBED_MODEL) return result.embeddings[0] except Exception as e: log(f"Embedding error: {e}") return None # ─── Helpers ───────────────────────────────────────────────────────────────── def _row_to_dict(row: dict) -> dict: """Serialize DB row for JSON output.""" out = {} for k, v in row.items(): if isinstance(v, datetime): out[k] = v.isoformat() elif isinstance(v, list) and k == "embedding": out[k] = None # don't return raw vectors else: out[k] = v return out def _fmt_memories(rows: list[dict]) -> str: if not rows: return "No results." parts = [] for r in rows: score = f" score={r.get('score', ''):.3f}" if r.get('score') is not None else "" parts.append( f"[{r['id']}] {r.get('mem_type','?').upper()} | {r.get('project') or '—'}" f"{score}\n" f" Title: {r.get('title') or '—'}\n" f" Tags: {', '.join(r.get('tags') or []) or '—'}\n" f" Date: {r.get('created_at','')}\n" f" Content:\n{_indent(r.get('content',''), 4)}" ) return "\n\n".join(parts) def _indent(text: str, n: int) -> str: pad = " " * n return "\n".join(pad + line for line in text.splitlines()) # ─── MCP server ────────────────────────────────────────────────────────────── mcp = FastMCP("knowledgebase") # ────────────────────────────────────────────────────────────────────────────── # STORE MEMORY # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def store_memory( content: str, mem_type: str = "fact", title: Optional[str] = None, summary: Optional[str] = None, tags: Optional[list[str]] = None, project: Optional[str] = None, source: Optional[str] = None, session_id: Optional[str] = None, importance: float = 0.5, meta: Optional[dict] = None, ) -> str: """ Uloží jedno paměťové záznam (fakt, rozhodnutí, preference, summary, …). mem_type: fact | decision | preference | summary | document | email | project | person | other importance: 0.0 (triviální) … 1.0 (kritické), default 0.5 tags: seznam klíčových slov pro filtrování session_id: pokud pochází z konkrétní konverzace Příklad: store_memory("Vlado preferuje stručné odpovědi bez trailing summary", mem_type="preference", tags=["komunikace", "styl"]) """ embedding = get_embedding( f"{title or ''} {summary or ''} {content}" ) conn = get_conn() try: with conn.transaction(): if embedding: from pgvector.psycopg import register_vector import numpy as np register_vector(conn) row = conn.execute( """ INSERT INTO kb_memories (mem_type, title, content, summary, tags, project, source, session_id, importance, embedding, meta) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id, created_at """, (mem_type, title, content, summary, tags or [], project, source, session_id, importance, np.array(embedding), json.dumps(meta or {})), ).fetchone() else: row = conn.execute( """ INSERT INTO kb_memories (mem_type, title, content, summary, tags, project, source, session_id, importance, meta) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id, created_at """, (mem_type, title, content, summary, tags or [], project, source, session_id, importance, json.dumps(meta or {})), ).fetchone() return f"Stored memory id={row['id']} at {row['created_at']}" except Exception as e: conn.rollback() log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # STORE CONVERSATION # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def store_conversation( messages: list[dict], session_id: Optional[str] = None, title: Optional[str] = None, summary: Optional[str] = None, project: Optional[str] = None, tags: Optional[list[str]] = None, key_memories: Optional[list[dict]] = None, ) -> str: """ Uloží celou konverzaci (seznam zpráv) jako session + automaticky extrahuje key_memories jako samostatné záznamy. messages: [{"role": "user"|"assistant", "content": "..."}] session_id: unikátní ID session (pokud není, vygeneruje se z timestampu) summary: shrnutí konverzace (doporučeno předat) key_memories: klíčové fakty/rozhodnutí z konverzace k samostatnému uložení [{"content": "...", "mem_type": "fact", "title": "...", "tags": [...], "importance": 0.7}] Příklad: store_conversation( messages=[...], session_id="2026-06-06-knowledgebase", title="Návrh Knowledgebase systému", summary="Vlado požadoval paměťový MCP server, rozhodli jsme se pro PG+pgvector+Voyage", project="knowledgebase", key_memories=[ {"content": "Rozhodnutí: PostgreSQL + pgvector + Voyage AI embeddings", "mem_type": "decision", "importance": 0.9, "tags": ["architektura"]}, ] ) """ import uuid sid = session_id or f"session-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}" conn = get_conn() try: with conn.transaction(): # upsert session conn.execute( """ INSERT INTO kb_sessions (id, title, summary, project, tags, msg_count, ended_at) VALUES (%s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (id) DO UPDATE SET title = EXCLUDED.title, summary = EXCLUDED.summary, project = EXCLUDED.project, tags = EXCLUDED.tags, msg_count = EXCLUDED.msg_count, ended_at = NOW() """, (sid, title, summary, project, tags or [], len(messages)), ) # upsert messages conn.execute("DELETE FROM kb_messages WHERE session_id = %s", (sid,)) for i, msg in enumerate(messages): conn.execute( """ INSERT INTO kb_messages (session_id, role, content, seq) VALUES (%s, %s, %s, %s) """, (sid, msg.get("role", "unknown"), msg.get("content", ""), i), ) # summary jako paměťový záznam if summary: emb = get_embedding(f"{title or ''} {summary}") _insert_memory_in_tx(conn, { "mem_type": "summary", "title": title or f"Session {sid}", "content": summary, "session_id": sid, "project": project, "tags": tags or [], "importance": 0.6, "embedding": emb, }) # key memories stored_km = 0 for km in (key_memories or []): if not km.get("content"): continue emb = get_embedding( f"{km.get('title','') or ''} {km.get('content','')}" ) _insert_memory_in_tx(conn, { "mem_type": km.get("mem_type", "fact"), "title": km.get("title"), "content": km["content"], "summary": km.get("summary"), "session_id": sid, "project": km.get("project", project), "tags": km.get("tags", tags or []), "importance": km.get("importance", 0.5), "source": km.get("source"), "embedding": emb, "meta": km.get("meta", {}), }) stored_km += 1 return ( f"Stored session '{sid}' with {len(messages)} messages" f" + {stored_km} key memories." ) except Exception as e: conn.rollback() log(traceback.format_exc()) return f"Error: {e}" def _insert_memory_in_tx(conn, data: dict): """Helper: insert memory within an existing transaction.""" embedding = data.get("embedding") if embedding: from pgvector.psycopg import register_vector import numpy as np register_vector(conn) conn.execute( """ INSERT INTO kb_memories (mem_type, title, content, summary, tags, project, source, session_id, importance, embedding, meta) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, (data.get("mem_type","fact"), data.get("title"), data["content"], data.get("summary"), data.get("tags",[]), data.get("project"), data.get("source"), data.get("session_id"), data.get("importance",0.5), np.array(embedding), json.dumps(data.get("meta",{}))), ) else: conn.execute( """ INSERT INTO kb_memories (mem_type, title, content, summary, tags, project, source, session_id, importance, meta) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, (data.get("mem_type","fact"), data.get("title"), data["content"], data.get("summary"), data.get("tags",[]), data.get("project"), data.get("source"), data.get("session_id"), data.get("importance",0.5), json.dumps(data.get("meta",{}))), ) # ────────────────────────────────────────────────────────────────────────────── # SEARCH # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def search( query: str, types: Optional[list[str]] = None, project: Optional[str] = None, tags: Optional[list[str]] = None, limit: int = 10, min_importance: float = 0.0, include_sessions: bool = False, ) -> str: """ Hybridní vyhledávání v paměti. Kombinuje full-text (vždy) + vektorové sémantické (pokud embeddingy dostupné). query: přirozený jazyk nebo klíčová slova types: ['fact','decision','preference','summary','document','email','project','person'] project: filtrovat dle projektu tags: musí obsahovat alespoň jeden z těchto tagů limit: max počet výsledků (default 10) min_importance: min hodnota důležitosti 0..1 include_sessions: zahrnout i výsledky z session summaries Příklad: search("PostgreSQL architektura", project="knowledgebase", types=["decision"]) """ conn = get_conn() results = [] try: # ── Full-text search ── conditions = ["deleted = FALSE", "fts @@ plainto_tsquery('simple', %s)"] params: list[Any] = [query] if types: conditions.append(f"mem_type = ANY(%s)") params.append(types) if project: conditions.append("project = %s") params.append(project) if tags: conditions.append("tags && %s") params.append(tags) if min_importance > 0: conditions.append("importance >= %s") params.append(min_importance) if not include_sessions: conditions.append("mem_type != 'summary' OR session_id IS NOT NULL") where = " AND ".join(conditions) rows = conn.execute( f""" SELECT id, mem_type, title, content, summary, tags, project, source, session_id, importance, created_at, ts_rank(fts, plainto_tsquery('simple', %s)) AS score FROM kb_memories WHERE {where} ORDER BY score DESC, importance DESC LIMIT %s """, [query] + params + [limit], ).fetchall() fts_ids = {r["id"] for r in rows} results = [_row_to_dict(r) for r in rows] # ── Vector search (nativní pgvector, <=> cosine distance) ── query_emb = get_embedding(query) if query_emb: try: from pgvector.psycopg import register_vector import numpy as np register_vector(conn) vec_conditions = ["deleted = FALSE", "embedding IS NOT NULL"] vec_params2: list[Any] = [] if types: vec_conditions.append("mem_type = ANY(%s)") vec_params2.append(types) if project: vec_conditions.append("project = %s") vec_params2.append(project) if tags: vec_conditions.append("tags && %s") vec_params2.append(tags) if min_importance > 0: vec_conditions.append("importance >= %s") vec_params2.append(min_importance) vec_where = " AND ".join(vec_conditions) vec_rows = conn.execute( f""" SELECT id, mem_type, title, content, summary, tags, project, source, session_id, importance, created_at, 1 - (embedding <=> %s::vector) AS score FROM kb_memories WHERE {vec_where} ORDER BY embedding <=> %s::vector LIMIT %s """, [np.array(query_emb), np.array(query_emb)] + vec_params2 + [limit], ).fetchall() for r in vec_rows: if r["id"] not in fts_ids: results.append(_row_to_dict(r)) except Exception as e: log(f"Vector search error: {e}") # deduplicate & sort by score seen = set() deduped = [] for r in results: if r["id"] not in seen: seen.add(r["id"]) deduped.append(r) deduped.sort(key=lambda x: (x.get("score") or 0, x.get("importance", 0)), reverse=True) return _fmt_memories(deduped[:limit]) except Exception as e: log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # GET CONTEXT (kontext pro aktuální konverzaci) # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def get_context( topic: str, project: Optional[str] = None, limit: int = 8, include_preferences: bool = True, ) -> str: """ Vrátí nejrelevantnější paměti pro daný kontext/téma. Automaticky přidá preference uživatele (pokud exist.) Použití na začátku konverzace: get_context("IWRS pacienti, notifikace, MongoDB") """ conn = get_conn() parts = [] # ── Main context ── main_result = search( query=topic, project=project, limit=limit, min_importance=0.3, ) parts.append("=== RELEVANTNÍ PAMĚTI ===\n" + main_result) # ── User preferences ── if include_preferences: try: rows = conn.execute( """ SELECT id, mem_type, title, content, tags, importance, created_at FROM kb_memories WHERE mem_type = 'preference' AND deleted = FALSE ORDER BY importance DESC, created_at DESC LIMIT 5 """, ).fetchall() if rows: pref_lines = [_fmt_memories([_row_to_dict(r) for r in rows])] parts.append("=== PREFERENCE UŽIVATELE ===\n" + "\n".join(pref_lines)) except Exception as e: log(f"Preferences error: {e}") return "\n\n".join(parts) # ────────────────────────────────────────────────────────────────────────────── # GET RECENT # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def get_recent( limit: int = 10, mem_type: Optional[str] = None, project: Optional[str] = None, ) -> str: """ Vrátí nejnovější paměti, volitelně filtrované. Příklad: get_recent(limit=5, mem_type="decision") """ conn = get_conn() conditions = ["deleted = FALSE"] params: list[Any] = [] if mem_type: conditions.append("mem_type = %s") params.append(mem_type) if project: conditions.append("project = %s") params.append(project) where = " AND ".join(conditions) try: rows = conn.execute( f""" SELECT id, mem_type, title, content, tags, project, importance, created_at FROM kb_memories WHERE {where} ORDER BY created_at DESC LIMIT %s """, params + [limit], ).fetchall() return _fmt_memories([_row_to_dict(r) for r in rows]) except Exception as e: log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # LIST SESSIONS # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def list_sessions( limit: int = 20, project: Optional[str] = None, ) -> str: """ Vypíše přehled uložených konverzačních sessions. """ conn = get_conn() conditions = [] params: list[Any] = [] if project: conditions.append("project = %s") params.append(project) where = "WHERE " + " AND ".join(conditions) if conditions else "" try: rows = conn.execute( f""" SELECT id, title, summary, project, tags, msg_count, started_at, ended_at FROM kb_sessions {where} ORDER BY ended_at DESC NULLS LAST LIMIT %s """, params + [limit], ).fetchall() if not rows: return "No sessions found." lines = [] for r in rows: lines.append( f"[{r['id']}] {r.get('title') or '—'} | {r.get('project') or '—'}" f" | {r['msg_count']} msgs | {r.get('ended_at','')}\n" f" {(r.get('summary') or '')[:120]}" ) return "\n\n".join(lines) except Exception as e: log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # GET SESSION # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def get_session( session_id: str, include_messages: bool = True, messages_limit: int = 100, ) -> str: """ Vrátí detail konkrétní session (metadata + zprávy). """ conn = get_conn() try: sess = conn.execute( "SELECT * FROM kb_sessions WHERE id = %s", (session_id,) ).fetchone() if not sess: return f"Session '{session_id}' not found." out = [ f"Session: {sess['id']}", f"Title: {sess.get('title') or '—'}", f"Project: {sess.get('project') or '—'}", f"Tags: {', '.join(sess.get('tags') or [])}", f"Date: {sess.get('ended_at') or sess.get('started_at')}", f"Summary:\n{_indent(sess.get('summary') or '—', 2)}", ] if include_messages: msgs = conn.execute( """ SELECT role, content, seq FROM kb_messages WHERE session_id = %s ORDER BY seq LIMIT %s """, (session_id, messages_limit), ).fetchall() out.append(f"\n--- Messages ({len(msgs)}) ---") for m in msgs: role = m["role"].upper() content = m["content"] if len(content) > 500: content = content[:500] + "…" out.append(f"\n[{role}]\n{content}") return "\n".join(out) except Exception as e: log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # UPDATE MEMORY # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def update_memory( memory_id: int, content: Optional[str] = None, title: Optional[str] = None, summary: Optional[str] = None, tags: Optional[list[str]] = None, importance: Optional[float] = None, project: Optional[str] = None, ) -> str: """ Aktualizuje existující paměťový záznam. Předej jen pole, která chceš změnit. """ conn = get_conn() updates = [] params: list[Any] = [] if content is not None: updates.append("content = %s") params.append(content) new_emb = get_embedding(f"{title or ''} {content}") if new_emb: from pgvector.psycopg import register_vector import numpy as np register_vector(conn) updates.append("embedding = %s") params.append(np.array(new_emb)) if title is not None: updates.append("title = %s") params.append(title) if summary is not None: updates.append("summary = %s") params.append(summary) if tags is not None: updates.append("tags = %s") params.append(tags) if importance is not None: updates.append("importance = %s") params.append(importance) if project is not None: updates.append("project = %s") params.append(project) if not updates: return "Nothing to update." params.append(memory_id) try: with conn.transaction(): conn.execute( f"UPDATE kb_memories SET {', '.join(updates)} WHERE id = %s", params, ) return f"Memory {memory_id} updated." except Exception as e: conn.rollback() return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # DELETE MEMORY # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def delete_memory(memory_id: int, hard: bool = False) -> str: """ Soft-delete (default) nebo hard-delete paměti. Soft: záznám zůstane v DB, jen se skryje z výsledků. Hard: smaže fyzicky. """ conn = get_conn() try: with conn.transaction(): if hard: conn.execute("DELETE FROM kb_memories WHERE id = %s", (memory_id,)) return f"Memory {memory_id} permanently deleted." else: conn.execute( "UPDATE kb_memories SET deleted = TRUE WHERE id = %s", (memory_id,) ) return f"Memory {memory_id} soft-deleted." except Exception as e: conn.rollback() return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── # STATS # ────────────────────────────────────────────────────────────────────────────── @mcp.tool() def stats() -> str: """ Přehled obsahu databáze: počty záznamů dle typu, sessions, projekty. """ conn = get_conn() try: type_counts = conn.execute( """ SELECT mem_type, COUNT(*) AS cnt FROM kb_memories WHERE deleted = FALSE GROUP BY mem_type ORDER BY cnt DESC """ ).fetchall() session_count = conn.execute( "SELECT COUNT(*) AS cnt FROM kb_sessions" ).fetchone()["cnt"] projects = conn.execute( """ SELECT project, COUNT(*) AS cnt FROM kb_memories WHERE deleted = FALSE AND project IS NOT NULL GROUP BY project ORDER BY cnt DESC LIMIT 10 """ ).fetchall() embed_count = conn.execute( "SELECT COUNT(*) AS cnt FROM kb_memories WHERE embedding IS NOT NULL" ).fetchone()["cnt"] lines = ["=== Knowledgebase Stats ==="] lines.append(f"\nSessions: {session_count}") lines.append(f"Embeddings: {embed_count} (Voyage AI {'active' if VOYAGE_API_KEY else 'not configured'})") lines.append("\nMemories by type:") for r in type_counts: lines.append(f" {r['mem_type']:15} {r['cnt']:>5}") if projects: lines.append("\nTop projects:") for r in projects: lines.append(f" {r['project']:25} {r['cnt']:>5}") return "\n".join(lines) except Exception as e: log(traceback.format_exc()) return f"Error: {e}" # ────────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": log("Starting Knowledgebase MCP server...") log(f" DB: {PG_DB}@{PG_HOST}:{PG_PORT}") log(f" Embeddings: {'Voyage AI' if VOYAGE_API_KEY else 'disabled (set VOYAGE_API_KEY)'}") mcp.run(transport="stdio")