165 lines
4.7 KiB
Python
165 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
mcp_jnjemails.py | v1.0 | 2026-06-01
|
|
MCP server pro dotazování SQLite DB jnjemails (EmailsImport pipeline).
|
|
Automaticky načte nejnovější jnjemails_*.db z \\tower\JNJEMAILS\db\.
|
|
"""
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
DB_DIR = Path(r"\\tower\JNJEMAILS\db")
|
|
|
|
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
def get_latest_db() -> Path:
|
|
files = sorted(DB_DIR.glob("jnjemails_*.db"), key=lambda f: f.name)
|
|
if not files:
|
|
raise FileNotFoundError(f"Žádný jnjemails_*.db soubor v {DB_DIR}")
|
|
return files[-1]
|
|
|
|
|
|
def query(sql: str, params=()) -> list[dict]:
|
|
db_path = get_latest_db()
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
cur = conn.execute(sql, params)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
mcp = FastMCP("jnjemails")
|
|
|
|
|
|
@mcp.tool()
|
|
def db_info() -> dict:
|
|
"""Informace o aktuální DB: cesta, velikost, název souboru."""
|
|
db_path = get_latest_db()
|
|
size_kb = db_path.stat().st_size // 1024
|
|
return {
|
|
"path": str(db_path),
|
|
"file": db_path.name,
|
|
"size_kb": size_kb,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def last_run() -> list[dict]:
|
|
"""Kompletní log posledního běhu (runs + log tabulky)."""
|
|
return query("""
|
|
SELECT r.script, r.version, r.started_at, r.finished_at,
|
|
r.transferred, r.skipped, r.errors,
|
|
l.level, l.event, l.subject, l.folder, l.detail, l.created_at
|
|
FROM log l
|
|
JOIN runs r ON r.id = l.run_id
|
|
WHERE l.run_id = (SELECT MAX(id) FROM runs)
|
|
ORDER BY l.created_at
|
|
""")
|
|
|
|
|
|
@mcp.tool()
|
|
def list_runs(limit: int = 20) -> list[dict]:
|
|
"""Přehled všech běhů skriptů (nejnovější nahoře)."""
|
|
return query("""
|
|
SELECT id, script, version, started_at, finished_at,
|
|
transferred, skipped, errors
|
|
FROM runs
|
|
ORDER BY started_at DESC
|
|
LIMIT ?
|
|
""", (limit,))
|
|
|
|
|
|
@mcp.tool()
|
|
def errors_last_run() -> list[dict]:
|
|
"""Chyby z posledního běhu."""
|
|
return query("""
|
|
SELECT l.event, l.subject, l.folder, l.detail, l.created_at
|
|
FROM log l
|
|
WHERE l.run_id = (SELECT MAX(id) FROM runs)
|
|
AND l.level = 'ERROR'
|
|
ORDER BY l.created_at
|
|
""")
|
|
|
|
|
|
@mcp.tool()
|
|
def run_log(run_id: int) -> list[dict]:
|
|
"""Log konkrétního běhu podle ID (z list_runs)."""
|
|
return query("""
|
|
SELECT level, event, subject, folder, graph_id, detail, created_at
|
|
FROM log
|
|
WHERE run_id = ?
|
|
ORDER BY created_at
|
|
""", (run_id,))
|
|
|
|
|
|
@mcp.tool()
|
|
def search_messages(subject: str = "", sender: str = "", folder: str = "", limit: int = 50) -> list[dict]:
|
|
"""Hledání v přenesených emailech. Všechny parametry jsou volitelné (LIKE, case-insensitive)."""
|
|
conditions = []
|
|
params = []
|
|
if subject:
|
|
conditions.append("subject LIKE ?")
|
|
params.append(f"%{subject}%")
|
|
if sender:
|
|
conditions.append("sender LIKE ?")
|
|
params.append(f"%{sender}%")
|
|
if folder:
|
|
conditions.append("jnj_folder LIKE ?")
|
|
params.append(f"%{folder}%")
|
|
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
|
|
params.append(limit)
|
|
return query(f"""
|
|
SELECT id, message_id, subject, sender, received_at,
|
|
jnj_folder, graph_id, is_read, source, uploaded_at
|
|
FROM messages
|
|
{where}
|
|
ORDER BY received_at DESC
|
|
LIMIT ?
|
|
""", params)
|
|
|
|
|
|
@mcp.tool()
|
|
def messages_stats() -> dict:
|
|
"""Statistiky: celkový počet emailů, podle source, podle složky (top 10)."""
|
|
total = query("SELECT COUNT(*) as cnt FROM messages")[0]["cnt"]
|
|
by_source = query("SELECT source, COUNT(*) as cnt FROM messages GROUP BY source ORDER BY cnt DESC")
|
|
by_folder = query("""
|
|
SELECT jnj_folder, COUNT(*) as cnt FROM messages
|
|
WHERE jnj_folder IS NOT NULL
|
|
GROUP BY jnj_folder
|
|
ORDER BY cnt DESC
|
|
LIMIT 10
|
|
""")
|
|
no_graph = query("SELECT COUNT(*) as cnt FROM messages WHERE graph_id IS NULL")[0]["cnt"]
|
|
return {
|
|
"total": total,
|
|
"no_graph_id": no_graph,
|
|
"by_source": by_source,
|
|
"top_folders": by_folder,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def sql(query_str: str) -> list[dict]:
|
|
"""Libovolný SELECT dotaz proti DB. Pouze SELECT (ostatní jsou blokovány)."""
|
|
stripped = query_str.strip().upper()
|
|
if not stripped.startswith("SELECT"):
|
|
return [{"error": "Povoleny jsou pouze SELECT dotazy."}]
|
|
try:
|
|
return query(query_str)
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
db_path = get_latest_db()
|
|
log(f"jnjemails MCP server — DB: {db_path.name}")
|
|
mcp.run()
|