Files
2026-06-16 19:38:48 +02:00

795 lines
32 KiB
Python

#!/usr/bin/env python3
"""
==============================================================================
MCP server: EMAILY (vsechny schranky importovane z Microsoft Graph)
Hybridni dotaz nad:
- PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
(fulltext tsvector - subject + sender + recipients +
attachments + body, GIN index, ts_headline, ts_rank)
- MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
(puvodni dokumenty z parse_emails_graph_v1.3.py:
headers, body_html, recipients[], attachments[], ...)
Source: U:\\janssen\\EmailsImport\\enrich_fulltext_emails_v1.0.py
Spusteni:
python mcp_emaily.py (stdio MCP)
==============================================================================
"""
from __future__ import annotations
import re
import sys
import traceback
from datetime import datetime, timezone, timedelta
from typing import Optional, Union
import psycopg
from mcp.server.fastmcp import FastMCP
from pymongo import MongoClient
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
"user=vladimir.buzalka password=Vlado7309208104++")
DEFAULT_BODY_CHARS = 8000
MAX_BODY_CHARS = 200_000
SKIP_COLLECTIONS = {"attachments_index", "sync_state"}
def log(msg: str) -> None:
print(msg, file=sys.stderr, flush=True)
try:
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
log(f"Mongo OK ({MONGO_URI})")
except Exception as e:
log(f"Mongo connection failed: {e}")
sys.exit(1)
try:
_t = psycopg.connect(PG_DSN, connect_timeout=10)
_t.close()
log("Postgres OK")
except Exception as e:
log(f"Postgres connection failed: {e}")
sys.exit(1)
def pg_conn():
return psycopg.connect(PG_DSN, connect_timeout=10)
def serialize(obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
if isinstance(obj, dict):
return {k: serialize(v) for k, v in obj.items()}
if isinstance(obj, list):
return [serialize(v) for v in obj]
return obj
def normalize_mailbox(mailbox: Optional[Union[str, list]]) -> Optional[list[str]]:
if mailbox is None or mailbox == "" or mailbox == []:
return None
if isinstance(mailbox, str):
return [mailbox]
return list(mailbox)
def parse_since(s: Optional[str]) -> Optional[datetime]:
if not s:
return None
try:
if "T" in s:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
return datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=timezone.utc)
except Exception as e:
raise ValueError(f"Bad date {s!r}: {e}")
# --- MCP --------------------------------------------------------------------
mcp = FastMCP("emaily")
@mcp.tool()
def ping() -> dict:
"""Quick health check. Reports Mongo + Postgres connectivity, total mailboxes,
PG indexed emails count, ok/error breakdown.
"""
try:
info = mongo.admin.command("buildInfo")
mailboxes = [c for c in mongo[MONGO_DB].list_collection_names()
if c not in SKIP_COLLECTIONS]
mongo_counts = {}
for mb in mailboxes:
mongo_counts[mb] = mongo[MONGO_DB][mb].estimated_document_count()
with pg_conn() as pg, pg.cursor() as cur:
cur.execute("SELECT mailbox, ok, count(*) FROM emails "
"GROUP BY mailbox, ok ORDER BY mailbox, ok")
rows = cur.fetchall()
pg_summary: dict = {}
for mb, ok, c in rows:
pg_summary.setdefault(mb, {})[("ok" if ok else "error")] = c
return {
"status": "ok",
"mongo_version": info.get("version"),
"mailboxes": mailboxes,
"mongo_email_count": mongo_counts,
"pg_indexed_per_mailbox": pg_summary,
}
except Exception as e:
log(traceback.format_exc())
return {"status": "error", "error": str(e)}
@mcp.tool()
def list_mailboxes() -> dict:
"""Overview of all mailboxes — totals, indexed coverage, earliest/latest received_at,
top senders by volume. Use to understand the corpus before searching.
"""
out = {}
try:
mailboxes = [c for c in mongo[MONGO_DB].list_collection_names()
if c not in SKIP_COLLECTIONS]
for mb in mailboxes:
with pg_conn() as pg, pg.cursor() as cur:
cur.execute("""
SELECT count(*) FILTER (WHERE ok) AS ok,
count(*) AS total,
min(received_at) AS first_at,
max(received_at) AS last_at,
count(*) FILTER (WHERE has_attachments) AS with_att
FROM emails WHERE mailbox = %s
""", (mb,))
ok, total, first_at, last_at, with_att = cur.fetchone()
cur.execute("""
SELECT sender_email, count(*) c FROM emails
WHERE mailbox = %s AND sender_email IS NOT NULL
GROUP BY sender_email ORDER BY c DESC LIMIT 5
""", (mb,))
top_senders = [{"email": s, "count": c} for s, c in cur.fetchall()]
out[mb] = {
"indexed_ok": ok,
"indexed_total": total,
"with_attachments": with_att,
"first_received": serialize(first_at),
"last_received": serialize(last_at),
"top_senders": top_senders,
}
return {"mailboxes": out}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def search(
query: str,
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
until: Optional[str] = None,
days: Optional[int] = None,
inflect: bool = False,
folder_contains: Optional[str] = None,
sender_contains: Optional[str] = None,
has_attachments: Optional[bool] = None,
limit: int = 20,
) -> dict:
"""PRIMARY TOOL — fulltext search across all indexed emails.
Index includes: subject, sender (email + name), recipients (to/cc),
attachment filenames, AND full body text. Diacritics are stripped at index
time, so search is already accent-insensitive (recept == řecept == RECEPT).
query: websearch_to_tsquery syntax (only when inflect=False):
invoice payment -> AND
"lot expiration" -> phrase
SAE OR "serious adverse" -> OR
urgent -newsletter -> exclude
mailbox: one mailbox string or list (e.g. "vbuzalka@its.jnj.com"). None = all.
since/until: ISO date "YYYY-MM-DD" on received_at
days: convenience window — only the last N days (overrides `since`). "za posledních X dní".
inflect: Czech declension. The index uses no stemmer, so a plain search for
"recept" misses "recepty/receptu/receptem/...". With inflect=True each word
in `query` is prefix-matched (recept -> recept:*) and AND-ed, catching the
other grammatical cases. Set this for Czech-word searches. Trade-off: a
prefix also matches unrelated longer words (recept:* also hits "receptor").
In this mode the query is treated as plain words (operators/quotes ignored).
folder_contains: substring match against folder_path (case-insensitive)
sender_contains: substring match against sender_email OR sender_name (case-insensitive)
has_attachments: True / False / None (any)
limit: max 100
Returns ranked results with `snippet` showing matches highlighted as <<...>>.
Use `read_email` to fetch full body of any hit.
"""
try:
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
until_dt = parse_since(until)
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
limit = min(max(1, limit), 100)
# Build the tsquery. inflect=True → prefix-match each word (Czech cases)
# via to_tsquery; otherwise use websearch_to_tsquery for full operator support.
tsq_func = "websearch_to_tsquery"
tsq_text = query
if inflect:
tokens = re.findall(r"\w+", query, flags=re.UNICODE)
if tokens:
tsq_func = "to_tsquery"
tsq_text = " & ".join(f"{t}:*" for t in tokens)
sql = f"""
WITH q AS (
SELECT {tsq_func}('soubory'::regconfig, %(query)s) AS tsq
)
SELECT
e.id, e.mailbox, e.message_id, e.conversation_id, e.folder_path,
e.subject, e.sender_email, e.sender_name,
e.to_addrs, e.cc_addrs,
e.received_at, e.sent_at, e.is_read,
e.has_attachments, e.attachment_count, e.attachments_summary,
e.body_length, e.body_source,
ts_rank(e.tsv, q.tsq) AS rank,
ts_headline('soubory'::regconfig,
left(coalesce(e.body, e.subject), 200000),
q.tsq,
'MaxFragments=3, MinWords=4, MaxWords=18, '
'StartSel=<<, StopSel=>>, FragmentDelimiter= ... ') AS snippet
FROM emails e, q
WHERE e.tsv @@ q.tsq
AND e.ok = TRUE
AND (%(mboxes)s::text[] IS NULL OR e.mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR e.received_at >= %(since)s::timestamptz)
AND (%(until)s::timestamptz IS NULL OR e.received_at < %(until)s::timestamptz)
AND (%(folder)s::text IS NULL OR e.folder_path ILIKE %(folder_like)s)
AND (%(sender)s::text IS NULL
OR e.sender_email ILIKE %(sender_like)s
OR e.sender_name ILIKE %(sender_like)s)
AND (%(has_att)s::boolean IS NULL OR e.has_attachments = %(has_att)s::boolean)
ORDER BY rank DESC, e.received_at DESC NULLS LAST
LIMIT %(limit)s
"""
params = {
"query": tsq_text, "mboxes": mboxes,
"since": since_dt, "until": until_dt,
"folder": folder_contains,
"folder_like": f"%{folder_contains}%" if folder_contains else None,
"sender": sender_contains,
"sender_like": f"%{sender_contains}%" if sender_contains else None,
"has_att": has_attachments,
"limit": limit,
}
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, params)
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
results = []
for r in rows:
results.append({
"mailbox": r["mailbox"],
"message_id": r["message_id"],
"conversation_id": r["conversation_id"],
"folder": r["folder_path"],
"subject": r["subject"],
"from": (f"{r['sender_name']} <{r['sender_email']}>"
if r["sender_name"] else r["sender_email"]),
"to": r["to_addrs"],
"cc": r["cc_addrs"],
"received_at": serialize(r["received_at"]),
"is_read": r["is_read"],
"has_attachments": r["has_attachments"],
"attachment_count": r["attachment_count"],
"attachments": r["attachments_summary"],
"body_length": r["body_length"],
"body_source": r["body_source"],
"rank": round(float(r["rank"]), 5),
"snippet": (r["snippet"] or "").strip(),
})
return {
"query": query,
"filters": {"mailbox": mboxes, "since": since, "until": until,
"folder_contains": folder_contains,
"sender_contains": sender_contains,
"has_attachments": has_attachments,
"limit": limit},
"count": len(results),
"results": results,
"tip": "Use read_email(mailbox=..., message_id=...) for full body or thread.",
}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e), "query": query}
@mcp.tool()
def read_email(
message_id: Optional[str] = None,
mailbox: Optional[str] = None,
offset: int = 0,
length: int = DEFAULT_BODY_CHARS,
around_match: Optional[str] = None,
include_html: bool = False,
) -> dict:
"""Read one email — full plain text body + metadata.
Identify by `message_id` (Internet Message-ID, the _id in Mongo).
`mailbox` narrows the lookup if the same Message-ID appears in multiple mailboxes
(e.g. you got copies in both work and personal accounts).
offset, length: slice the body. length max 200000.
around_match: case-insensitive substring; returns up to 3 windows of ~1000 chars
centered on matches, instead of a flat slice.
include_html: also return raw body_html from Mongo (typically large — only if you
really need the original markup).
"""
if not message_id:
return {"error": "Provide message_id."}
try:
length = min(max(1, length), MAX_BODY_CHARS)
sql = """
SELECT id, mailbox, message_id, graph_id, conversation_id, folder_path,
subject, sender_email, sender_name,
to_addrs, cc_addrs, bcc_addrs,
sent_at, received_at, modified_at, is_read, is_draft,
has_attachments, attachment_count, attachments_summary,
body, body_length, body_source,
extractor_version, extracted_at, ok, error
FROM emails WHERE message_id = %s
"""
params = [message_id]
if mailbox:
sql += " AND mailbox = %s"
params.append(mailbox)
sql += " LIMIT 1"
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
cols = [c.name for c in cur.description]
if not row:
return {"error": "Email not found.",
"message_id": message_id, "mailbox": mailbox}
rec = dict(zip(cols, row))
body = rec.get("body") or ""
if around_match and body:
needle = around_match.lower()
hay = body.lower()
windows = []
start = 0
while len(windows) < 3:
pos = hay.find(needle, start)
if pos < 0:
break
lo = max(0, pos - 400)
hi = min(len(body), pos + 600)
windows.append({"offset": lo, "text": body[lo:hi]})
start = pos + len(needle)
body_out = None
slice_info = {"mode": "around_match", "match": around_match,
"windows_found": len(windows), "windows": windows}
else:
body_out = body[offset:offset + length]
slice_info = {
"mode": "slice", "offset": offset,
"length_returned": len(body_out),
"has_more": offset + length < len(body),
"next_offset": offset + length if offset + length < len(body) else None,
}
out = {
"mailbox": rec["mailbox"],
"message_id": rec["message_id"],
"conversation_id": rec["conversation_id"],
"folder": rec["folder_path"],
"subject": rec["subject"],
"from": (f"{rec['sender_name']} <{rec['sender_email']}>"
if rec["sender_name"] else rec["sender_email"]),
"to": rec["to_addrs"],
"cc": rec["cc_addrs"],
"bcc": rec["bcc_addrs"],
"received_at": serialize(rec["received_at"]),
"sent_at": serialize(rec["sent_at"]),
"is_read": rec["is_read"],
"is_draft": rec["is_draft"],
"has_attachments": rec["has_attachments"],
"attachment_count": rec["attachment_count"],
"attachments": rec["attachments_summary"],
"body_length": rec["body_length"],
"body_source": rec["body_source"],
"extractor_version": rec["extractor_version"],
"ok": rec["ok"],
"error": rec["error"],
}
if body_out is not None:
out["body"] = body_out
out["slice"] = slice_info
if include_html:
mdoc = mongo[MONGO_DB][rec["mailbox"]].find_one(
{"_id": rec["message_id"]}, {"body_html": 1, "attachments": 1})
if mdoc:
out["body_html"] = mdoc.get("body_html")
out["attachments_detail"] = mdoc.get("attachments")
return out
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def by_sender(
sender: str,
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
has_attachments: Optional[bool] = None,
limit: int = 30,
) -> dict:
"""List emails from a specific sender (substring match on sender_email or sender_name,
case-insensitive). Use for "what did X send me" or "all newsletters from Y".
Returned sorted by received_at DESC.
"""
try:
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
limit = min(max(1, limit), 200)
sql = """
SELECT mailbox, message_id, subject, sender_email, sender_name,
to_addrs, folder_path, received_at, has_attachments, attachment_count,
attachments_summary, body_length
FROM emails
WHERE ok = TRUE
AND (sender_email ILIKE %(s)s OR sender_name ILIKE %(s)s)
AND (%(mboxes)s::text[] IS NULL OR mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR received_at >= %(since)s::timestamptz)
AND (%(has_att)s::boolean IS NULL OR has_attachments = %(has_att)s::boolean)
ORDER BY received_at DESC NULLS LAST
LIMIT %(limit)s
"""
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, {"s": f"%{sender}%", "mboxes": mboxes,
"since": since_dt, "has_att": has_attachments,
"limit": limit})
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
for r in rows:
r["received_at"] = serialize(r["received_at"])
return {"sender_match": sender, "count": len(rows), "results": rows}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def recent_emails(
mailbox: Optional[Union[str, list]] = None,
days: int = 7,
folder_contains: Optional[str] = None,
has_attachments: Optional[bool] = None,
limit: int = 30,
) -> dict:
"""List recent emails (by received_at). Use for "what came in today/this week".
days=0 to ignore time window (just top-N newest).
"""
try:
mboxes = normalize_mailbox(mailbox)
limit = min(max(1, limit), 200)
since_dt = None
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
sql = """
SELECT mailbox, message_id, subject, sender_email, sender_name,
folder_path, received_at, has_attachments, attachment_count,
attachments_summary, body_length, is_read
FROM emails
WHERE ok = TRUE
AND (%(mboxes)s::text[] IS NULL OR mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR received_at >= %(since)s::timestamptz)
AND (%(folder)s::text IS NULL OR folder_path ILIKE %(folder_like)s)
AND (%(has_att)s::boolean IS NULL OR has_attachments = %(has_att)s::boolean)
ORDER BY received_at DESC NULLS LAST
LIMIT %(limit)s
"""
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, {
"mboxes": mboxes, "since": since_dt,
"folder": folder_contains,
"folder_like": f"%{folder_contains}%" if folder_contains else None,
"has_att": has_attachments, "limit": limit,
})
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
for r in rows:
r["received_at"] = serialize(r["received_at"])
return {"days": days, "count": len(rows), "results": rows}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def conversation_thread(conversation_id: str, limit: int = 50) -> dict:
"""Return all emails in one Outlook conversation thread (conversation_id from Graph).
Ordered chronologically. Use to see the full back-and-forth on a topic.
"""
try:
limit = min(max(1, limit), 200)
with pg_conn() as pg, pg.cursor() as cur:
cur.execute("""
SELECT mailbox, message_id, subject, sender_email, sender_name,
to_addrs, received_at, folder_path, body_length, has_attachments,
attachments_summary
FROM emails
WHERE conversation_id = %s AND ok = TRUE
ORDER BY received_at ASC NULLS LAST
LIMIT %s
""", (conversation_id, limit))
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
for r in rows:
r["received_at"] = serialize(r["received_at"])
return {"conversation_id": conversation_id, "count": len(rows), "thread": rows}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def find_attachment(
name_contains: str,
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
limit: int = 30,
) -> dict:
"""Find emails whose attachment filename contains the substring (case-insensitive).
Use for "find emails with that protocol PDF" or "any invoice attachments".
Returns emails ordered by received_at DESC.
"""
try:
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
limit = min(max(1, limit), 200)
sql = """
SELECT mailbox, message_id, subject, sender_email, sender_name,
received_at, attachment_count, attachments_summary, folder_path
FROM emails
WHERE ok = TRUE
AND has_attachments = TRUE
AND attachments_summary ILIKE %(s)s
AND (%(mboxes)s::text[] IS NULL OR mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR received_at >= %(since)s::timestamptz)
ORDER BY received_at DESC NULLS LAST
LIMIT %(limit)s
"""
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, {"s": f"%{name_contains}%",
"mboxes": mboxes, "since": since_dt, "limit": limit})
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
for r in rows:
r["received_at"] = serialize(r["received_at"])
return {"name_match": name_contains, "count": len(rows), "results": rows}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def top_senders(
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
days: Optional[int] = None,
folder_contains: Optional[str] = None,
limit: int = 20,
) -> dict:
"""Unique senders grouped by sender_email, counted, sorted by count DESC.
Use for "who emails me most" / "top senders this month".
mailbox: one mailbox string or list. None = all.
since: ISO date "YYYY-MM-DD" lower bound on received_at.
days: convenience window — count only the last N days (overrides `since`
when both given). Use for "za posledních X dní".
folder_contains: substring match against folder_path (case-insensitive).
Pass "Inbox" to count ONLY received/incoming mail and exclude the
mailbox owner's own Sent Items, Drafts, etc. Default None = all folders.
"""
try:
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
limit = min(max(1, limit), 100)
sql = """
SELECT sender_email, count(*) AS c, max(received_at) AS last_at
FROM emails
WHERE ok = TRUE AND sender_email IS NOT NULL
AND (%(mboxes)s::text[] IS NULL OR mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR received_at >= %(since)s::timestamptz)
AND (%(folder)s::text IS NULL OR folder_path ILIKE %(folder_like)s)
GROUP BY sender_email
ORDER BY c DESC
LIMIT %(limit)s
"""
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, {
"mboxes": mboxes, "since": since_dt,
"folder": folder_contains,
"folder_like": f"%{folder_contains}%" if folder_contains else None,
"limit": limit,
})
rows = [{"sender_email": s, "count": c, "last_at": serialize(t)}
for s, c, t in cur.fetchall()]
return {"count": len(rows), "results": rows}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def pipeline_status(mailbox: Optional[Union[str, list]] = None) -> dict:
"""End-to-end status of the email-ingest pipeline per mailbox.
Reports, for each mailbox, where it stands in the 5-step pipeline:
1. parse_emails_graph -> mongo_total
2. (refetch text bodies) -> body_text_missing (legacy v1.3 emails)
3. download_attachments -> attach_done / attach_pending
attach_missing (404 — marked, won't retry)
attach_reference (OneDrive/SharePoint link, no content)
4. unwrap_smime -> smime_p7m_total / smime_unwrapped / smime_pending
smime_p7s_count (informational; not unwrapped by design)
5. enrich_fulltext -> pg_indexed
Plus:
- permanently_deleted (marked by delta sync)
Use this instead of running multiple Mongo count queries by hand. Returns
one row per mailbox; if `mailbox` is given, returns just those rows.
"""
try:
mbs = normalize_mailbox(mailbox)
all_mb = [c for c in mongo[MONGO_DB].list_collection_names()
if c not in SKIP_COLLECTIONS]
targets = [m for m in all_mb if (mbs is None or m in mbs)]
# PG counts in one pass
pg_counts: dict[str, int] = {}
with pg_conn() as pg, pg.cursor() as cur:
cur.execute("SELECT mailbox, count(*) FROM emails "
"WHERE ok = true GROUP BY mailbox")
for mb, c in cur.fetchall():
pg_counts[mb] = c
out = {}
for mb in targets:
col = mongo[MONGO_DB][mb]
mongo_total = col.estimated_document_count()
with_att = col.count_documents({"has_attachments": True})
attach_pending = col.count_documents({
"has_attachments": True,
"attachments": {"$elemMatch": {
"is_inline": False,
"file_hash": {"$exists": False},
"attachment_missing": {"$ne": True},
"attachment_reference": {"$ne": True},
}},
})
attach_missing = col.count_documents({
"attachments.attachment_missing": True,
})
attach_reference = col.count_documents({
"attachments.attachment_reference": True,
})
attach_done = with_att - attach_pending - attach_missing - attach_reference
smime_p7m_total = col.count_documents(
{"attachments.filename": {"$regex": r"^smime\.p7m$", "$options": "i"}}
)
smime_unwrapped = col.count_documents({
"attachments.filename": {"$regex": r"^smime\.p7m$", "$options": "i"},
"smime_unwrapped": True,
})
smime_p7s_count = col.count_documents(
{"attachments.filename": {"$regex": r"^smime\.p7s$", "$options": "i"}}
)
body_text_missing = col.count_documents({
"body_html": {"$in": [None, ""]},
"body_text": {"$exists": False},
"graph_id": {"$exists": True},
})
permanently_deleted = col.count_documents({"permanently_deleted": True})
out[mb] = {
"mongo_total": mongo_total,
"with_attachments": with_att,
"attach_done": attach_done,
"attach_pending": attach_pending,
"attach_missing": attach_missing,
"attach_reference": attach_reference,
"smime_p7m_total": smime_p7m_total,
"smime_unwrapped": smime_unwrapped,
"smime_pending": smime_p7m_total - smime_unwrapped,
"smime_p7s_count": smime_p7s_count,
"body_text_missing": body_text_missing,
"pg_indexed": pg_counts.get(mb, 0),
"permanently_deleted": permanently_deleted,
}
return {"mailboxes": out}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def sync_state_overview(mailbox: Optional[Union[str, list]] = None) -> dict:
"""Delta-sync state across mailboxes (collection `emaily.sync_state`).
For each (mailbox, folder) pair shows: deltaLink present?, last_run_at,
cumulative new/sync/removed/run_count. Use to confirm a mailbox is
incrementally synced and to spot folders that haven't run in a while.
"""
try:
sync_col = mongo[MONGO_DB]["sync_state"]
q: dict = {}
mbs = normalize_mailbox(mailbox)
if mbs:
q["mailbox"] = {"$in": mbs}
cursor = sync_col.find(q, {
"mailbox": 1, "folder_path": 1, "folder_id": 1,
"delta_link": 1, "last_run_at": 1,
"cumulative_new": 1, "cumulative_sync": 1,
"cumulative_removed": 1, "run_count": 1,
}).sort([("mailbox", 1), ("folder_path", 1)])
by_mailbox: dict[str, list] = {}
for d in cursor:
row = {
"folder_path": d.get("folder_path"),
"folder_id": d.get("folder_id"),
"has_delta_link": bool(d.get("delta_link")),
"last_run_at": serialize(d.get("last_run_at")),
"cumulative_new": d.get("cumulative_new", 0),
"cumulative_sync": d.get("cumulative_sync", 0),
"cumulative_removed": d.get("cumulative_removed", 0),
"run_count": d.get("run_count", 0),
}
by_mailbox.setdefault(d["mailbox"], []).append(row)
# mailboxes that have collections but ZERO sync_state entries
all_mb = {c for c in mongo[MONGO_DB].list_collection_names()
if c not in SKIP_COLLECTIONS}
not_synced = sorted(all_mb - set(by_mailbox.keys()))
if mbs:
not_synced = [m for m in not_synced if m in mbs]
return {
"mailboxes": by_mailbox,
"never_delta_synced": not_synced,
}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
if __name__ == "__main__":
log("MCP emaily server started (FastMCP)")
mcp.run()