Files
janssen/Outlook/TRASH/mcp_owa_v1.1.py
T
2026-06-04 11:40:45 +02:00

372 lines
12 KiB
Python

"""
=======================================================================
Název: mcp_owa_v1.1.py
Verze: 1.1
Datum: 2026-06-04
Popis: MCP server pro práci s otevřeným OWA oknem (Playwright).
Drží persistent session a vystavuje tooly:
- vyhledání emailu v MongoDB OperativniEmailyJNJ.messages
- otevření emailu v OWA UI přes search
- Forward → vepsání úvodního textu na začátek body
- zavření původního čtecího panelu
Odeslání forwardu dělá uživatel sám.
Profil sdílí s import_emails_to_mongo_v1.0.py (outlook_profile/).
Změny v1.1:
- sync_playwright v dedikovaném worker threadu (Windows + async_playwright
v persistent contextu padá: Chrome se zavře hned po startu).
- FastMCP tooly synchronní, dispatch na worker přes queue + Future.
Spuštění: python mcp_owa_v1.1.py
=======================================================================
"""
import queue
import sys
import threading
from concurrent.futures import Future
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
from playwright.sync_api import sync_playwright, Page, BrowserContext
from pymongo import MongoClient, DESCENDING
# ── Konfigurace ────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent
PROFILE_DIR = BASE_DIR / "outlook_profile"
START_URL = "https://outlook.cloud.microsoft/mail/"
MONGO_URI = "mongodb://192.168.1.76:27017"
DB_NAME = "OperativniEmailyJNJ"
COL_NAME = "messages"
SEARCH_READY = (
'[placeholder*="Search"], [aria-label*="Search"], '
'[placeholder*="Hledat"], [aria-label*="Hledat"]'
)
def log(msg: str):
print(msg, file=sys.stderr, flush=True)
# ── Worker thread (drží Playwright) ────────────────────────────────────
class PWWorker:
def __init__(self):
self.q: queue.Queue = queue.Queue()
self.ready = threading.Event()
self.pw = None
self.context: Optional[BrowserContext] = None
self.main_page: Optional[Page] = None
self.draft_page: Optional[Page] = None
self.thread = threading.Thread(target=self._run, daemon=True, name="pw-worker")
self.thread.start()
self.ready.wait(timeout=10)
def _run(self):
with sync_playwright() as p:
self.pw = p
self.ready.set()
log("pw-worker: sync_playwright ready")
while True:
item = self.q.get()
if item is None:
break
fn, args, kwargs, fut = item
try:
fut.set_result(fn(self, *args, **kwargs))
except Exception as e:
fut.set_exception(e)
def call(self, fn, *args, **kwargs):
fut: Future = Future()
self.q.put((fn, args, kwargs, fut))
return fut.result()
WORKER = PWWorker()
_mongo = MongoClient(MONGO_URI)
_col = _mongo[DB_NAME][COL_NAME]
mcp = FastMCP("owa")
# ── Helpers ────────────────────────────────────────────────────────────
def _doc_summary(doc: dict) -> dict:
return {
"message_id": doc.get("message_id"),
"subject": doc.get("subject"),
"from": doc.get("from"),
"to": doc.get("to", [])[:5],
"date": doc.get("date").isoformat() if doc.get("date") else None,
"folder": doc.get("folder"),
"has_attachments": doc.get("has_attachments", False),
}
def _wait_ready(page: Page):
page.wait_for_load_state("domcontentloaded")
page.wait_for_selector(SEARCH_READY, timeout=30_000)
# ── Worker functions (běží na pw threadu, dostávají worker jako 1. arg) ─
def _start(w: PWWorker) -> dict:
if w.context is not None:
return {"status": "already_running", "url": w.main_page.url if w.main_page else None}
if not PROFILE_DIR.exists():
return {"status": "error", "error": f"Profil nenalezen: {PROFILE_DIR}. Spusť outlook_login_v1.0.py."}
w.context = w.pw.chromium.launch_persistent_context(
user_data_dir=str(PROFILE_DIR),
headless=False,
no_viewport=True,
accept_downloads=True,
args=[
"--disable-blink-features=AutomationControlled",
"--start-maximized",
],
)
w.main_page = w.context.pages[0] if w.context.pages else w.context.new_page()
w.main_page.goto(START_URL)
_wait_ready(w.main_page)
return {"status": "started", "url": w.main_page.url}
def _stop(w: PWWorker) -> dict:
if w.context is None:
return {"status": "not_running"}
try:
w.context.close()
finally:
w.context = None
w.main_page = None
w.draft_page = None
return {"status": "stopped"}
def _status(w: PWWorker) -> dict:
if w.context is None:
return {"running": False}
return {
"running": True,
"url": w.main_page.url if w.main_page else None,
"tabs": len(w.context.pages),
"draft_open": w.draft_page is not None and not w.draft_page.is_closed(),
}
def _open_by_subject(w: PWWorker, subject: str) -> dict:
if w.context is None:
return {"status": "not_running"}
page = w.main_page
search = page.locator(SEARCH_READY).first
search.click()
search.fill("")
search.type(subject, delay=20)
page.keyboard.press("Enter")
page.wait_for_timeout(2_000)
msgs = page.locator('div[role="option"]')
try:
msgs.first.wait_for(state="visible", timeout=10_000)
except Exception:
return {"status": "no_results"}
msgs.first.click()
page.wait_for_timeout(800)
return {"status": "opened", "count_visible": msgs.count()}
def _forward(w: PWWorker, body_prefix: str = "", subject_prefix: str = "") -> dict:
if w.context is None:
return {"status": "not_running"}
page = w.main_page
page.keyboard.press("Control+Shift+F")
page.wait_for_timeout(1_500)
body = page.locator(
'div[aria-label*="Message body" i][contenteditable="true"], '
'div[aria-label*="Tělo zprávy" i][contenteditable="true"]'
).first
try:
body.wait_for(state="visible", timeout=10_000)
except Exception:
return {"status": "forward_failed", "hint": "Body editor forwardu nenalezen."}
w.draft_page = page # inline composer ve stejném tabu
if subject_prefix:
subj = page.locator(
'input[aria-label*="subject" i], input[aria-label*="předmět" i]'
).first
try:
subj.wait_for(state="visible", timeout=5_000)
current = subj.input_value()
subj.fill(f"{subject_prefix}{current}")
except Exception:
pass
if body_prefix:
body.click()
page.keyboard.press("Control+Home")
page.keyboard.type(body_prefix, delay=10)
page.keyboard.press("Enter")
return {"status": "forward_ready"}
def _write_at_top(w: PWWorker, text: str) -> dict:
if w.draft_page is None or w.draft_page.is_closed():
return {"status": "no_draft"}
page = w.draft_page
body = page.locator(
'div[aria-label*="Message body" i][contenteditable="true"], '
'div[aria-label*="Tělo zprávy" i][contenteditable="true"]'
).first
body.click()
page.keyboard.press("Control+Home")
page.keyboard.type(text, delay=10)
return {"status": "written", "chars": len(text)}
def _set_recipients(w: PWWorker, to: list[str], cc: Optional[list[str]] = None) -> dict:
if w.draft_page is None or w.draft_page.is_closed():
return {"status": "no_draft"}
page = w.draft_page
to_field = page.locator(
'div[aria-label*="To" i][contenteditable="true"], '
'div[aria-label*="Komu" i][contenteditable="true"]'
).first
to_field.click()
page.keyboard.type("; ".join(to) + ";", delay=15)
page.wait_for_timeout(500)
if cc:
cc_field = page.locator(
'div[aria-label*="Cc" i][contenteditable="true"], '
'div[aria-label*="Kopie" i][contenteditable="true"]'
).first
if cc_field.count():
cc_field.click()
page.keyboard.type("; ".join(cc) + ";", delay=15)
return {"status": "filled", "to": to, "cc": cc or []}
def _close_pane(w: PWWorker) -> dict:
if w.context is None:
return {"status": "not_running"}
w.main_page.keyboard.press("Escape")
w.main_page.wait_for_timeout(300)
return {"status": "closed"}
def _screenshot(w: PWWorker, path: str) -> dict:
if w.context is None:
return {"status": "not_running"}
out = (BASE_DIR / path).resolve()
w.main_page.screenshot(path=str(out), full_page=False)
return {"status": "ok", "path": str(out)}
# ── MCP tooly (synchronní; dispatch na worker) ─────────────────────────
@mcp.tool()
def start_owa() -> dict:
"""Spustí Playwright s persistent profilem a otevře OWA. Okno zůstane otevřené."""
return WORKER.call(_start)
@mcp.tool()
def stop_owa() -> dict:
"""Zavře Playwright context (a tím i okno OWA)."""
return WORKER.call(_stop)
@mcp.tool()
def status() -> dict:
"""Vrátí stav session: běží/neběží, URL, počet tabů, draft otevřen?"""
return WORKER.call(_status)
@mcp.tool()
def find_emails(
query: Optional[str] = None,
from_email: Optional[str] = None,
folder: Optional[str] = None,
since_iso: Optional[str] = None,
limit: int = 10,
) -> list[dict]:
"""Hledá v MongoDB. `query` = substring v subjectu (case-insensitive).
`since_iso` = ISO datum, vrátí jen emaily od něj. Seřazeno od nejnovějšího."""
flt: dict = {}
if query:
flt["subject"] = {"$regex": query, "$options": "i"}
if from_email:
flt["from.email"] = {"$regex": from_email, "$options": "i"}
if folder:
flt["folder"] = folder
if since_iso:
try:
dt = datetime.fromisoformat(since_iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
flt["date"] = {"$gte": dt}
except Exception:
pass
cur = _col.find(flt).sort("date", DESCENDING).limit(max(1, min(limit, 50)))
return [_doc_summary(d) for d in cur]
@mcp.tool()
def find_last_email(from_email: Optional[str] = None, folder: Optional[str] = None) -> Optional[dict]:
"""Vrátí nejnovější email (volitelně filtr podle odesílatele / složky)."""
flt: dict = {}
if from_email:
flt["from.email"] = {"$regex": from_email, "$options": "i"}
if folder:
flt["folder"] = folder
docs = list(_col.find(flt).sort("date", DESCENDING).limit(1))
return _doc_summary(docs[0]) if docs else None
@mcp.tool()
def open_email_by_subject(subject: str) -> dict:
"""Vyhledá v OWA podle subjectu a otevře první výsledek v reading pane."""
return WORKER.call(_open_by_subject, subject)
@mcp.tool()
def forward_current(body_prefix: str = "", subject_prefix: str = "") -> dict:
"""Klikne Forward (Ctrl+Shift+F). Pokud je `body_prefix`, vepíše ho na začátek body.
Pokud `subject_prefix`, předřadí ho do předmětu draftu."""
return WORKER.call(_forward, body_prefix, subject_prefix)
@mcp.tool()
def write_at_top(text: str) -> dict:
"""Vepíše text na začátek body otevřeného draftu (před existující obsah / podpis)."""
return WORKER.call(_write_at_top, text)
@mcp.tool()
def set_recipients(to: list[str], cc: Optional[list[str]] = None) -> dict:
"""Vyplní To / Cc v otevřeném draftu."""
return WORKER.call(_set_recipients, to, cc)
@mcp.tool()
def close_reading_pane() -> dict:
"""Zavře otevřený email v reading pane (Escape). Forward draft tím nezavře."""
return WORKER.call(_close_pane)
@mcp.tool()
def screenshot(path: str = "owa_screenshot.png") -> dict:
"""Uloží screenshot aktivního okna pro orientaci."""
return WORKER.call(_screenshot, path)
if __name__ == "__main__":
log("mcp_owa v1.1 starting (stdio)…")
mcp.run()