Files

165 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
mcp_jnjemails.py | v1.0 | 2026-06-01
MCP server pro dotazování SQLite DB jnjemails (EmailsImport pipeline).
Automaticky načte nejnovější jnjemails_*.db z \\tower\JNJEMAILS\db\.
"""
import sqlite3
import sys
from pathlib import Path
from mcp.server.fastmcp import FastMCP
DB_DIR = Path(r"\\tower\JNJEMAILS\db")
def log(msg: str):
print(msg, file=sys.stderr, flush=True)
def get_latest_db() -> Path:
files = sorted(DB_DIR.glob("jnjemails_*.db"), key=lambda f: f.name)
if not files:
raise FileNotFoundError(f"Žádný jnjemails_*.db soubor v {DB_DIR}")
return files[-1]
def query(sql: str, params=()) -> list[dict]:
db_path = get_latest_db()
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
cur = conn.execute(sql, params)
return [dict(row) for row in cur.fetchall()]
finally:
conn.close()
mcp = FastMCP("jnjemails")
@mcp.tool()
def db_info() -> dict:
"""Informace o aktuální DB: cesta, velikost, název souboru."""
db_path = get_latest_db()
size_kb = db_path.stat().st_size // 1024
return {
"path": str(db_path),
"file": db_path.name,
"size_kb": size_kb,
}
@mcp.tool()
def last_run() -> list[dict]:
"""Kompletní log posledního běhu (runs + log tabulky)."""
return query("""
SELECT r.script, r.version, r.started_at, r.finished_at,
r.transferred, r.skipped, r.errors,
l.level, l.event, l.subject, l.folder, l.detail, l.created_at
FROM log l
JOIN runs r ON r.id = l.run_id
WHERE l.run_id = (SELECT MAX(id) FROM runs)
ORDER BY l.created_at
""")
@mcp.tool()
def list_runs(limit: int = 20) -> list[dict]:
"""Přehled všech běhů skriptů (nejnovější nahoře)."""
return query("""
SELECT id, script, version, started_at, finished_at,
transferred, skipped, errors
FROM runs
ORDER BY started_at DESC
LIMIT ?
""", (limit,))
@mcp.tool()
def errors_last_run() -> list[dict]:
"""Chyby z posledního běhu."""
return query("""
SELECT l.event, l.subject, l.folder, l.detail, l.created_at
FROM log l
WHERE l.run_id = (SELECT MAX(id) FROM runs)
AND l.level = 'ERROR'
ORDER BY l.created_at
""")
@mcp.tool()
def run_log(run_id: int) -> list[dict]:
"""Log konkrétního běhu podle ID (z list_runs)."""
return query("""
SELECT level, event, subject, folder, graph_id, detail, created_at
FROM log
WHERE run_id = ?
ORDER BY created_at
""", (run_id,))
@mcp.tool()
def search_messages(subject: str = "", sender: str = "", folder: str = "", limit: int = 50) -> list[dict]:
"""Hledání v přenesených emailech. Všechny parametry jsou volitelné (LIKE, case-insensitive)."""
conditions = []
params = []
if subject:
conditions.append("subject LIKE ?")
params.append(f"%{subject}%")
if sender:
conditions.append("sender LIKE ?")
params.append(f"%{sender}%")
if folder:
conditions.append("jnj_folder LIKE ?")
params.append(f"%{folder}%")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
return query(f"""
SELECT id, message_id, subject, sender, received_at,
jnj_folder, graph_id, is_read, source, uploaded_at
FROM messages
{where}
ORDER BY received_at DESC
LIMIT ?
""", params)
@mcp.tool()
def messages_stats() -> dict:
"""Statistiky: celkový počet emailů, podle source, podle složky (top 10)."""
total = query("SELECT COUNT(*) as cnt FROM messages")[0]["cnt"]
by_source = query("SELECT source, COUNT(*) as cnt FROM messages GROUP BY source ORDER BY cnt DESC")
by_folder = query("""
SELECT jnj_folder, COUNT(*) as cnt FROM messages
WHERE jnj_folder IS NOT NULL
GROUP BY jnj_folder
ORDER BY cnt DESC
LIMIT 10
""")
no_graph = query("SELECT COUNT(*) as cnt FROM messages WHERE graph_id IS NULL")[0]["cnt"]
return {
"total": total,
"no_graph_id": no_graph,
"by_source": by_source,
"top_folders": by_folder,
}
@mcp.tool()
def sql(query_str: str) -> list[dict]:
"""Libovolný SELECT dotaz proti DB. Pouze SELECT (ostatní jsou blokovány)."""
stripped = query_str.strip().upper()
if not stripped.startswith("SELECT"):
return [{"error": "Povoleny jsou pouze SELECT dotazy."}]
try:
return query(query_str)
except Exception as e:
return [{"error": str(e)}]
if __name__ == "__main__":
db_path = get_latest_db()
log(f"jnjemails MCP server — DB: {db_path.name}")
mcp.run()