Files
janssen/Knowledgebase/server.py
T
administrator 586c2c4484 Fix vector search parameter ordering for pgvector
%s placeholders in SQL are positional — SELECT score param must come
before WHERE conditions in the params list, not after.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 06:41:30 +02:00

835 lines
32 KiB
Python

#!/usr/bin/env python3
"""
Knowledgebase MCP server
========================
Persistentní paměť pro Claude konverzace a znalosti.
Vyhledávání:
- Full-text (tsvector, vždy dostupné)
- Sémantické (Voyage AI embeddingy + Python cosine similarity reranking)
- Hybridní kombinace obou
Env proměnné:
PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB
VOYAGE_API_KEY — pro vektorové embeddingy (volitelné)
"""
import json
import os
import sys
import traceback
from datetime import datetime
from typing import Any, Optional
import psycopg
from psycopg.rows import dict_row
from mcp.server.fastmcp import FastMCP
# ─── Config ──────────────────────────────────────────────────────────────────
PG_HOST = os.getenv("PG_HOST", "192.168.1.76")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_USER = os.getenv("PG_USER", "vladimir.buzalka")
PG_PASSWORD = os.getenv("PG_PASSWORD", "Vlado7309208104++")
PG_DB = os.getenv("PG_DB", "knowledgebase")
VOYAGE_API_KEY = os.getenv("VOYAGE_API_KEY", "")
EMBED_MODEL = "voyage-3-lite" # 512-dim, fast & cheap
EMBED_DIM = 512
# ─── Logging ─────────────────────────────────────────────────────────────────
def log(msg: str):
print(f"[KB] {msg}", file=sys.stderr, flush=True)
# ─── DB connection ───────────────────────────────────────────────────────────
_conn: Optional[psycopg.Connection] = None
def get_conn() -> psycopg.Connection:
global _conn
if _conn is not None and not _conn.closed:
try:
_conn.execute("SELECT 1")
return _conn
except Exception:
pass
_conn = psycopg.connect(
host=PG_HOST, port=PG_PORT,
user=PG_USER, password=PG_PASSWORD,
dbname=PG_DB,
row_factory=dict_row,
autocommit=False,
)
# register_vector jednou na connection — umožní psycopg správně serializovat np.array jako vector
try:
from pgvector.psycopg import register_vector
register_vector(_conn)
except Exception as e:
log(f"pgvector register warning: {e}")
log(f"Connected to {PG_DB}@{PG_HOST}")
return _conn
# ─── Embeddings ──────────────────────────────────────────────────────────────
_voyage_client = None
def get_embedding(text: str) -> Optional[list[float]]:
"""Return 1024-dim embedding via Voyage AI, or None if unavailable."""
global _voyage_client
if not VOYAGE_API_KEY:
return None
try:
if _voyage_client is None:
import voyageai
_voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY)
result = _voyage_client.embed([text[:8000]], model=EMBED_MODEL)
return result.embeddings[0]
except Exception as e:
log(f"Embedding error: {e}")
return None
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _row_to_dict(row: dict) -> dict:
"""Serialize DB row for JSON output."""
out = {}
for k, v in row.items():
if isinstance(v, datetime):
out[k] = v.isoformat()
elif isinstance(v, list) and k == "embedding":
out[k] = None # don't return raw vectors
else:
out[k] = v
return out
def _fmt_memories(rows: list[dict]) -> str:
if not rows:
return "No results."
parts = []
for r in rows:
score = f" score={r.get('score', ''):.3f}" if r.get('score') is not None else ""
parts.append(
f"[{r['id']}] {r.get('mem_type','?').upper()} | {r.get('project') or ''}"
f"{score}\n"
f" Title: {r.get('title') or ''}\n"
f" Tags: {', '.join(r.get('tags') or []) or ''}\n"
f" Date: {r.get('created_at','')}\n"
f" Content:\n{_indent(r.get('content',''), 4)}"
)
return "\n\n".join(parts)
def _indent(text: str, n: int) -> str:
pad = " " * n
return "\n".join(pad + line for line in text.splitlines())
# ─── MCP server ──────────────────────────────────────────────────────────────
mcp = FastMCP("knowledgebase")
# ──────────────────────────────────────────────────────────────────────────────
# STORE MEMORY
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def store_memory(
content: str,
mem_type: str = "fact",
title: Optional[str] = None,
summary: Optional[str] = None,
tags: Optional[list[str]] = None,
project: Optional[str] = None,
source: Optional[str] = None,
session_id: Optional[str] = None,
importance: float = 0.5,
meta: Optional[dict] = None,
) -> str:
"""
Uloží jedno paměťové záznam (fakt, rozhodnutí, preference, summary, …).
mem_type: fact | decision | preference | summary | document | email | project | person | other
importance: 0.0 (triviální) … 1.0 (kritické), default 0.5
tags: seznam klíčových slov pro filtrování
session_id: pokud pochází z konkrétní konverzace
Příklad:
store_memory("Vlado preferuje stručné odpovědi bez trailing summary",
mem_type="preference", tags=["komunikace", "styl"])
"""
embedding = get_embedding(
f"{title or ''} {summary or ''} {content}"
)
conn = get_conn()
try:
with conn.transaction():
if embedding:
import numpy as np
row = conn.execute(
"""
INSERT INTO kb_memories
(mem_type, title, content, summary, tags, project,
source, session_id, importance, embedding, meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id, created_at
""",
(mem_type, title, content, summary,
tags or [], project, source, session_id,
importance, np.array(embedding), json.dumps(meta or {})),
).fetchone()
else:
row = conn.execute(
"""
INSERT INTO kb_memories
(mem_type, title, content, summary, tags, project,
source, session_id, importance, meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id, created_at
""",
(mem_type, title, content, summary,
tags or [], project, source, session_id,
importance, json.dumps(meta or {})),
).fetchone()
return f"Stored memory id={row['id']} at {row['created_at']}"
except Exception as e:
conn.rollback()
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# STORE CONVERSATION
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def store_conversation(
messages: list[dict],
session_id: Optional[str] = None,
title: Optional[str] = None,
summary: Optional[str] = None,
project: Optional[str] = None,
tags: Optional[list[str]] = None,
key_memories: Optional[list[dict]] = None,
) -> str:
"""
Uloží celou konverzaci (seznam zpráv) jako session + automaticky
extrahuje key_memories jako samostatné záznamy.
messages: [{"role": "user"|"assistant", "content": "..."}]
session_id: unikátní ID session (pokud není, vygeneruje se z timestampu)
summary: shrnutí konverzace (doporučeno předat)
key_memories: klíčové fakty/rozhodnutí z konverzace k samostatnému uložení
[{"content": "...", "mem_type": "fact", "title": "...", "tags": [...], "importance": 0.7}]
Příklad:
store_conversation(
messages=[...],
session_id="2026-06-06-knowledgebase",
title="Návrh Knowledgebase systému",
summary="Vlado požadoval paměťový MCP server, rozhodli jsme se pro PG+pgvector+Voyage",
project="knowledgebase",
key_memories=[
{"content": "Rozhodnutí: PostgreSQL + pgvector + Voyage AI embeddings",
"mem_type": "decision", "importance": 0.9, "tags": ["architektura"]},
]
)
"""
import uuid
sid = session_id or f"session-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}"
conn = get_conn()
try:
with conn.transaction():
# upsert session
conn.execute(
"""
INSERT INTO kb_sessions (id, title, summary, project, tags, msg_count, ended_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
summary = EXCLUDED.summary,
project = EXCLUDED.project,
tags = EXCLUDED.tags,
msg_count = EXCLUDED.msg_count,
ended_at = NOW()
""",
(sid, title, summary, project, tags or [], len(messages)),
)
# upsert messages
conn.execute("DELETE FROM kb_messages WHERE session_id = %s", (sid,))
for i, msg in enumerate(messages):
conn.execute(
"""
INSERT INTO kb_messages (session_id, role, content, seq)
VALUES (%s, %s, %s, %s)
""",
(sid, msg.get("role", "unknown"), msg.get("content", ""), i),
)
# summary jako paměťový záznam
if summary:
emb = get_embedding(f"{title or ''} {summary}")
_insert_memory_in_tx(conn, {
"mem_type": "summary",
"title": title or f"Session {sid}",
"content": summary,
"session_id": sid,
"project": project,
"tags": tags or [],
"importance": 0.6,
"embedding": emb,
})
# key memories
stored_km = 0
for km in (key_memories or []):
if not km.get("content"):
continue
emb = get_embedding(
f"{km.get('title','') or ''} {km.get('content','')}"
)
_insert_memory_in_tx(conn, {
"mem_type": km.get("mem_type", "fact"),
"title": km.get("title"),
"content": km["content"],
"summary": km.get("summary"),
"session_id": sid,
"project": km.get("project", project),
"tags": km.get("tags", tags or []),
"importance": km.get("importance", 0.5),
"source": km.get("source"),
"embedding": emb,
"meta": km.get("meta", {}),
})
stored_km += 1
return (
f"Stored session '{sid}' with {len(messages)} messages"
f" + {stored_km} key memories."
)
except Exception as e:
conn.rollback()
log(traceback.format_exc())
return f"Error: {e}"
def _insert_memory_in_tx(conn, data: dict):
"""Helper: insert memory within an existing transaction."""
embedding = data.get("embedding")
if embedding:
import numpy as np
conn.execute(
"""
INSERT INTO kb_memories
(mem_type, title, content, summary, tags, project,
source, session_id, importance, embedding, meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(data.get("mem_type","fact"), data.get("title"),
data["content"], data.get("summary"),
data.get("tags",[]), data.get("project"),
data.get("source"), data.get("session_id"),
data.get("importance",0.5),
np.array(embedding),
json.dumps(data.get("meta",{}))),
)
else:
conn.execute(
"""
INSERT INTO kb_memories
(mem_type, title, content, summary, tags, project,
source, session_id, importance, meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(data.get("mem_type","fact"), data.get("title"),
data["content"], data.get("summary"),
data.get("tags",[]), data.get("project"),
data.get("source"), data.get("session_id"),
data.get("importance",0.5),
json.dumps(data.get("meta",{}))),
)
# ──────────────────────────────────────────────────────────────────────────────
# SEARCH
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def search(
query: str,
types: Optional[list[str]] = None,
project: Optional[str] = None,
tags: Optional[list[str]] = None,
limit: int = 10,
min_importance: float = 0.0,
include_sessions: bool = False,
) -> str:
"""
Hybridní vyhledávání v paměti.
Kombinuje full-text (vždy) + vektorové sémantické (pokud embeddingy dostupné).
query: přirozený jazyk nebo klíčová slova
types: ['fact','decision','preference','summary','document','email','project','person']
project: filtrovat dle projektu
tags: musí obsahovat alespoň jeden z těchto tagů
limit: max počet výsledků (default 10)
min_importance: min hodnota důležitosti 0..1
include_sessions: zahrnout i výsledky z session summaries
Příklad:
search("PostgreSQL architektura", project="knowledgebase", types=["decision"])
"""
conn = get_conn()
results = []
try:
# ── Full-text search ──
conditions = ["deleted = FALSE", "fts @@ plainto_tsquery('simple', %s)"]
params: list[Any] = [query]
if types:
conditions.append(f"mem_type = ANY(%s)")
params.append(types)
if project:
conditions.append("project = %s")
params.append(project)
if tags:
conditions.append("tags && %s")
params.append(tags)
if min_importance > 0:
conditions.append("importance >= %s")
params.append(min_importance)
if not include_sessions:
conditions.append("mem_type != 'summary' OR session_id IS NOT NULL")
where = " AND ".join(conditions)
rows = conn.execute(
f"""
SELECT id, mem_type, title, content, summary, tags,
project, source, session_id, importance, created_at,
ts_rank(fts, plainto_tsquery('simple', %s)) AS score
FROM kb_memories
WHERE {where}
ORDER BY score DESC, importance DESC
LIMIT %s
""",
[query] + params + [limit],
).fetchall()
fts_ids = {r["id"] for r in rows}
results = [_row_to_dict(r) for r in rows]
# ── Vector search (nativní pgvector, <=> cosine distance) ──
query_emb = get_embedding(query)
if query_emb:
try:
import numpy as np
vec_conditions = ["deleted = FALSE", "embedding IS NOT NULL"]
vec_params2: list[Any] = []
if types:
vec_conditions.append("mem_type = ANY(%s)")
vec_params2.append(types)
if project:
vec_conditions.append("project = %s")
vec_params2.append(project)
if tags:
vec_conditions.append("tags && %s")
vec_params2.append(tags)
if min_importance > 0:
vec_conditions.append("importance >= %s")
vec_params2.append(min_importance)
vec_where = " AND ".join(vec_conditions)
qv = np.array(query_emb)
# Pořadí %s musí odpovídat pořadí v SQL:
# 1. WHERE podmínky (vec_params2)
# 2. SELECT score: embedding <=> %s
# 3. ORDER BY: embedding <=> %s
# 4. LIMIT %s
vec_rows = conn.execute(
f"""
SELECT id, mem_type, title, content, summary, tags,
project, source, session_id, importance, created_at,
1 - (embedding <=> %s) AS score
FROM kb_memories
WHERE {vec_where}
ORDER BY embedding <=> %s
LIMIT %s
""",
[qv] + vec_params2 + [qv, limit],
).fetchall()
for r in vec_rows:
if r["id"] not in fts_ids:
results.append(_row_to_dict(r))
except Exception as e:
log(f"Vector search error: {e}")
# deduplicate & sort by score
seen = set()
deduped = []
for r in results:
if r["id"] not in seen:
seen.add(r["id"])
deduped.append(r)
deduped.sort(key=lambda x: (x.get("score") or 0, x.get("importance", 0)), reverse=True)
return _fmt_memories(deduped[:limit])
except Exception as e:
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# GET CONTEXT (kontext pro aktuální konverzaci)
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def get_context(
topic: str,
project: Optional[str] = None,
limit: int = 8,
include_preferences: bool = True,
) -> str:
"""
Vrátí nejrelevantnější paměti pro daný kontext/téma.
Automaticky přidá preference uživatele (pokud exist.)
Použití na začátku konverzace:
get_context("IWRS pacienti, notifikace, MongoDB")
"""
conn = get_conn()
parts = []
# ── Main context ──
main_result = search(
query=topic,
project=project,
limit=limit,
min_importance=0.3,
)
parts.append("=== RELEVANTNÍ PAMĚTI ===\n" + main_result)
# ── User preferences ──
if include_preferences:
try:
rows = conn.execute(
"""
SELECT id, mem_type, title, content, tags, importance, created_at
FROM kb_memories
WHERE mem_type = 'preference'
AND deleted = FALSE
ORDER BY importance DESC, created_at DESC
LIMIT 5
""",
).fetchall()
if rows:
pref_lines = [_fmt_memories([_row_to_dict(r) for r in rows])]
parts.append("=== PREFERENCE UŽIVATELE ===\n" + "\n".join(pref_lines))
except Exception as e:
log(f"Preferences error: {e}")
return "\n\n".join(parts)
# ──────────────────────────────────────────────────────────────────────────────
# GET RECENT
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def get_recent(
limit: int = 10,
mem_type: Optional[str] = None,
project: Optional[str] = None,
) -> str:
"""
Vrátí nejnovější paměti, volitelně filtrované.
Příklad:
get_recent(limit=5, mem_type="decision")
"""
conn = get_conn()
conditions = ["deleted = FALSE"]
params: list[Any] = []
if mem_type:
conditions.append("mem_type = %s")
params.append(mem_type)
if project:
conditions.append("project = %s")
params.append(project)
where = " AND ".join(conditions)
try:
rows = conn.execute(
f"""
SELECT id, mem_type, title, content, tags, project, importance, created_at
FROM kb_memories
WHERE {where}
ORDER BY created_at DESC
LIMIT %s
""",
params + [limit],
).fetchall()
return _fmt_memories([_row_to_dict(r) for r in rows])
except Exception as e:
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# LIST SESSIONS
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def list_sessions(
limit: int = 20,
project: Optional[str] = None,
) -> str:
"""
Vypíše přehled uložených konverzačních sessions.
"""
conn = get_conn()
conditions = []
params: list[Any] = []
if project:
conditions.append("project = %s")
params.append(project)
where = "WHERE " + " AND ".join(conditions) if conditions else ""
try:
rows = conn.execute(
f"""
SELECT id, title, summary, project, tags, msg_count, started_at, ended_at
FROM kb_sessions
{where}
ORDER BY ended_at DESC NULLS LAST
LIMIT %s
""",
params + [limit],
).fetchall()
if not rows:
return "No sessions found."
lines = []
for r in rows:
lines.append(
f"[{r['id']}] {r.get('title') or ''} | {r.get('project') or ''}"
f" | {r['msg_count']} msgs | {r.get('ended_at','')}\n"
f" {(r.get('summary') or '')[:120]}"
)
return "\n\n".join(lines)
except Exception as e:
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# GET SESSION
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def get_session(
session_id: str,
include_messages: bool = True,
messages_limit: int = 100,
) -> str:
"""
Vrátí detail konkrétní session (metadata + zprávy).
"""
conn = get_conn()
try:
sess = conn.execute(
"SELECT * FROM kb_sessions WHERE id = %s", (session_id,)
).fetchone()
if not sess:
return f"Session '{session_id}' not found."
out = [
f"Session: {sess['id']}",
f"Title: {sess.get('title') or ''}",
f"Project: {sess.get('project') or ''}",
f"Tags: {', '.join(sess.get('tags') or [])}",
f"Date: {sess.get('ended_at') or sess.get('started_at')}",
f"Summary:\n{_indent(sess.get('summary') or '', 2)}",
]
if include_messages:
msgs = conn.execute(
"""
SELECT role, content, seq FROM kb_messages
WHERE session_id = %s ORDER BY seq LIMIT %s
""",
(session_id, messages_limit),
).fetchall()
out.append(f"\n--- Messages ({len(msgs)}) ---")
for m in msgs:
role = m["role"].upper()
content = m["content"]
if len(content) > 500:
content = content[:500] + ""
out.append(f"\n[{role}]\n{content}")
return "\n".join(out)
except Exception as e:
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# UPDATE MEMORY
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def update_memory(
memory_id: int,
content: Optional[str] = None,
title: Optional[str] = None,
summary: Optional[str] = None,
tags: Optional[list[str]] = None,
importance: Optional[float] = None,
project: Optional[str] = None,
) -> str:
"""
Aktualizuje existující paměťový záznam.
Předej jen pole, která chceš změnit.
"""
conn = get_conn()
updates = []
params: list[Any] = []
if content is not None:
updates.append("content = %s")
params.append(content)
new_emb = get_embedding(f"{title or ''} {content}")
if new_emb:
import numpy as np
updates.append("embedding = %s")
params.append(np.array(new_emb))
if title is not None:
updates.append("title = %s")
params.append(title)
if summary is not None:
updates.append("summary = %s")
params.append(summary)
if tags is not None:
updates.append("tags = %s")
params.append(tags)
if importance is not None:
updates.append("importance = %s")
params.append(importance)
if project is not None:
updates.append("project = %s")
params.append(project)
if not updates:
return "Nothing to update."
params.append(memory_id)
try:
with conn.transaction():
conn.execute(
f"UPDATE kb_memories SET {', '.join(updates)} WHERE id = %s",
params,
)
return f"Memory {memory_id} updated."
except Exception as e:
conn.rollback()
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# DELETE MEMORY
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def delete_memory(memory_id: int, hard: bool = False) -> str:
"""
Soft-delete (default) nebo hard-delete paměti.
Soft: záznám zůstane v DB, jen se skryje z výsledků.
Hard: smaže fyzicky.
"""
conn = get_conn()
try:
with conn.transaction():
if hard:
conn.execute("DELETE FROM kb_memories WHERE id = %s", (memory_id,))
return f"Memory {memory_id} permanently deleted."
else:
conn.execute(
"UPDATE kb_memories SET deleted = TRUE WHERE id = %s", (memory_id,)
)
return f"Memory {memory_id} soft-deleted."
except Exception as e:
conn.rollback()
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# STATS
# ──────────────────────────────────────────────────────────────────────────────
@mcp.tool()
def stats() -> str:
"""
Přehled obsahu databáze: počty záznamů dle typu, sessions, projekty.
"""
conn = get_conn()
try:
type_counts = conn.execute(
"""
SELECT mem_type, COUNT(*) AS cnt
FROM kb_memories
WHERE deleted = FALSE
GROUP BY mem_type ORDER BY cnt DESC
"""
).fetchall()
session_count = conn.execute(
"SELECT COUNT(*) AS cnt FROM kb_sessions"
).fetchone()["cnt"]
projects = conn.execute(
"""
SELECT project, COUNT(*) AS cnt
FROM kb_memories
WHERE deleted = FALSE AND project IS NOT NULL
GROUP BY project ORDER BY cnt DESC LIMIT 10
"""
).fetchall()
embed_count = conn.execute(
"SELECT COUNT(*) AS cnt FROM kb_memories WHERE embedding IS NOT NULL"
).fetchone()["cnt"]
lines = ["=== Knowledgebase Stats ==="]
lines.append(f"\nSessions: {session_count}")
lines.append(f"Embeddings: {embed_count} (Voyage AI {'active' if VOYAGE_API_KEY else 'not configured'})")
lines.append("\nMemories by type:")
for r in type_counts:
lines.append(f" {r['mem_type']:15} {r['cnt']:>5}")
if projects:
lines.append("\nTop projects:")
for r in projects:
lines.append(f" {r['project']:25} {r['cnt']:>5}")
return "\n".join(lines)
except Exception as e:
log(traceback.format_exc())
return f"Error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
log("Starting Knowledgebase MCP server...")
log(f" DB: {PG_DB}@{PG_HOST}:{PG_PORT}")
log(f" Embeddings: {'Voyage AI' if VOYAGE_API_KEY else 'disabled (set VOYAGE_API_KEY)'}")
mcp.run(transport="stdio")