#!/usr/bin/env python3 """ ============================================================================== MCP server: SOUBORY (Dropbox studie 42847922MDD3003 + 77242113UCO3001) Hybridni dotaz nad: - PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents (fulltext tsvector index, ts_headline, ts_rank) - MongoDB 192.168.1.76 db=soubory kolekce=42847922MDD3003, 77242113UCO3001 (metadata, content.* z enrich_files_v1.0) Spusteni: python mcp_soubory.py (stdio MCP) Pridano do U:\\janssen\\.mcp.json jako "soubory". ============================================================================== """ from __future__ import annotations import sys import traceback from datetime import datetime, timezone, timedelta from typing import Optional, Union import psycopg from bson import ObjectId from mcp.server.fastmcp import FastMCP from pymongo import MongoClient # --- konfigurace ------------------------------------------------------------ MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "soubory" # Kratky alias -> Mongo kolekce = PG.study STUDY_MAP = { "MDD3003": "42847922MDD3003", "UCO3001": "77242113UCO3001", } STUDY_ALL = list(STUDY_MAP.values()) PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory " "user=vladimir.buzalka password=Vlado7309208104++") # Limit kolik telo doc vracime defaultne (aby tool response nebyla obri) DEFAULT_BODY_CHARS = 8000 MAX_BODY_CHARS = 200_000 def log(msg: str) -> None: print(msg, file=sys.stderr, flush=True) # --- inicializace klientu --------------------------------------------------- try: mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") log(f"Mongo OK ({MONGO_URI})") except Exception as e: log(f"Mongo connection failed: {e}") sys.exit(1) try: _test = psycopg.connect(PG_DSN, connect_timeout=10) _test.close() log("Postgres OK") except Exception as e: log(f"Postgres connection failed: {e}") sys.exit(1) def pg_conn(): return psycopg.connect(PG_DSN, connect_timeout=10) def serialize(obj): if isinstance(obj, ObjectId): return str(obj) if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, bytes): return obj.decode("utf-8", errors="replace") if isinstance(obj, dict): return {k: serialize(v) for k, v in obj.items()} if isinstance(obj, list): return [serialize(v) for v in obj] return obj def resolve_studies(study: Optional[Union[str, list]]) -> Optional[list[str]]: """Alias 'MDD3003' / 'UCO3001' -> plne nazvy kolekce. None -> obe (vraci None pro PG = bez filtru).""" if study is None or study == "" or study == []: return None if isinstance(study, str): study = [study] out = [] for s in study: if s in STUDY_MAP: out.append(STUDY_MAP[s]) elif s in STUDY_MAP.values(): out.append(s) else: raise ValueError(f"Unknown study {s!r}. Use MDD3003 / UCO3001 or full code.") return out def normalize_exts(ext: Optional[Union[str, list]]) -> Optional[list[str]]: if ext is None or ext == "" or ext == []: return None if isinstance(ext, str): ext = [ext] return [e.lower().lstrip(".") for e in ext] def parse_since(since: Optional[str]) -> Optional[datetime]: if not since: return None # akceptuj YYYY-MM-DD i ISO try: if "T" in since: return datetime.fromisoformat(since.replace("Z", "+00:00")) return datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc) except Exception as e: raise ValueError(f"Bad date {since!r}: {e}") def short_meta(content: dict) -> dict: """Zhustene metadata z content.* pro tool response.""" if not content or not content.get("ok", True): return {"ok": False, "error": (content or {}).get("error")} out = {} for k in ("title", "subject", "author", "last_modified_by", "from", "to", "cc", "date", "pages", "slides", "total_sheets", "paragraphs", "words", "created", "modified", "encrypted"): if k in content and content[k] not in (None, "", []): v = content[k] if isinstance(v, str) and len(v) > 200: v = v[:200] + "..." out[k] = v if "sheets" in content: out["sheet_names"] = [s.get("name") for s in content.get("sheets", []) if s] if "attachments" in content: out["attachment_count"] = len(content.get("attachments") or []) if out["attachment_count"]: out["attachments"] = content["attachments"][:10] if "text_head" in content: head = content["text_head"] out["text_head"] = head[:400] + ("..." if head and len(head) > 400 else "") return out # --- MCP -------------------------------------------------------------------- mcp = FastMCP("soubory") @mcp.tool() def ping() -> dict: """Quick health check. Reports Mongo + Postgres connectivity, totals per study, and PG documents.ok count. Call this first when starting an investigation to confirm everything is up. """ try: info = mongo.admin.command("buildInfo") study_counts = {} for code in STUDY_ALL: study_counts[code] = mongo[MONGO_DB][code].estimated_document_count() with pg_conn() as pg, pg.cursor() as cur: cur.execute("SELECT study, ok, count(*) FROM documents GROUP BY study, ok ORDER BY study, ok") rows = cur.fetchall() pg_summary = {} for s, ok, c in rows: pg_summary.setdefault(s, {})[("ok" if ok else "error")] = c return { "status": "ok", "mongo_version": info.get("version"), "mongo_files_per_study": study_counts, "pg_documents_per_study": pg_summary, "studies": STUDY_MAP, } except Exception as e: log(traceback.format_exc()) return {"status": "error", "error": str(e)} @mcp.tool() def list_studies() -> dict: """Overview of both studies — total files, breakdown by extension, fulltext coverage, earliest/latest mtime. Use this to understand the corpus before searching. """ out = {} try: for alias, code in STUDY_MAP.items(): col = mongo[MONGO_DB][code] total = col.count_documents({}) deleted = col.count_documents({"deleted_at": {"$exists": True}}) ext_breakdown = list(col.aggregate([ {"$match": {"deleted_at": {"$exists": False}}}, {"$group": {"_id": "$ext", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, ])) mtime_minmax = list(col.aggregate([ {"$match": {"deleted_at": {"$exists": False}}}, {"$group": {"_id": None, "min_mtime": {"$min": "$mtime"}, "max_mtime": {"$max": "$mtime"}}}, ])) with pg_conn() as pg, pg.cursor() as cur: cur.execute( "SELECT count(*) FILTER (WHERE ok), count(*) FROM documents WHERE study=%s", (code,), ) pg_ok, pg_total = cur.fetchone() out[alias] = { "code": code, "mongo_total": total, "mongo_active": total - deleted, "mongo_deleted": deleted, "by_ext": {r["_id"]: r["count"] for r in ext_breakdown}, "fulltext_indexed": pg_ok, "fulltext_failed": pg_total - pg_ok, "oldest_mtime": serialize(mtime_minmax[0]["min_mtime"]) if mtime_minmax else None, "newest_mtime": serialize(mtime_minmax[0]["max_mtime"]) if mtime_minmax else None, } return {"studies": out} except Exception as e: log(traceback.format_exc()) raise @mcp.tool() def search( query: str, study: Optional[Union[str, list]] = None, ext: Optional[Union[str, list]] = None, since: Optional[str] = None, folder: Optional[str] = None, limit: int = 15, with_metadata: bool = True, ) -> dict: """PRIMARY TOOL — fulltext search across all parsed documents in both studies. query: search expression in PostgreSQL websearch_to_tsquery syntax: adverse event -> AND (both must appear) "adverse event" -> exact phrase adverse OR serious -> OR adverse -mild -> exclude study: "MDD3003", "UCO3001", or list. None = both. ext: filter file types: ["pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv"] since: ISO date "YYYY-MM-DD" — only files modified on/after this date folder: substring match against any parent folder name (e.g. "CRF", "Training") limit: max results (default 15, max 100) with_metadata: if True, also fetch content.* metadata from Mongo (author, pages, sheets, EML headers) Returns ranked results with `snippet` showing matches highlighted with <<...>>. Use `read_document` to fetch full body of a specific hit. """ try: studies = resolve_studies(study) exts = normalize_exts(ext) since_dt = parse_since(since) limit = min(max(1, limit), 100) sql = """ WITH q AS ( SELECT websearch_to_tsquery('soubory'::regconfig, %(query)s) AS tsq ) SELECT d.id, d.mongo_id, d.study, d.path, d.rel_path, d.name, d.ext, d.size_bytes, d.mtime, d.body_length, ts_rank(d.tsv, q.tsq) AS rank, ts_headline('soubory'::regconfig, left(d.body, 200000), q.tsq, 'MaxFragments=3, MinWords=4, MaxWords=18, ' 'StartSel=<<, StopSel=>>, FragmentDelimiter= ... ') AS snippet FROM documents d, q WHERE d.tsv @@ q.tsq AND d.ok = TRUE AND (%(studies)s::text[] IS NULL OR d.study = ANY(%(studies)s::text[])) AND (%(exts)s::text[] IS NULL OR d.ext = ANY(%(exts)s::text[])) AND (%(since)s::timestamptz IS NULL OR d.mtime >= %(since)s::timestamptz) ORDER BY rank DESC, d.mtime DESC NULLS LAST LIMIT %(limit)s """ params = {"query": query, "studies": studies, "exts": exts, "since": since_dt, "limit": limit} with pg_conn() as pg, pg.cursor() as cur: cur.execute(sql, params) cols = [c.name for c in cur.description] rows = [dict(zip(cols, r)) for r in cur.fetchall()] # filter by folder via Mongo (PG nema parent_folders) meta_by_path: dict[str, dict] = {} if rows and (with_metadata or folder): by_study: dict[str, list[str]] = {} for r in rows: by_study.setdefault(r["study"], []).append(r["path"]) for code, paths in by_study.items(): proj = {"path": 1, "parent_folders": 1, "dates_in_name": 1} if with_metadata: proj["content"] = 1 for d in mongo[MONGO_DB][code].find({"path": {"$in": paths}}, proj): meta_by_path[d["path"]] = d if folder: needle = folder.lower() kept = [] for r in rows: folders = (meta_by_path.get(r["path"]) or {}).get("parent_folders") or [] if any(needle in (f or "").lower() for f in folders): kept.append(r) rows = kept results = [] for r in rows: mongo_doc = meta_by_path.get(r["path"]) or {} results.append({ "study": r["study"], "path": r["path"], "rel_path": r["rel_path"], "name": r["name"], "ext": r["ext"], "size_mb": round((r["size_bytes"] or 0) / 1024 / 1024, 2), "mtime": serialize(r["mtime"]), "body_length": r["body_length"], "rank": round(float(r["rank"]), 5), "snippet": (r["snippet"] or "").strip(), "mongo_id": r["mongo_id"], "dates_in_name": mongo_doc.get("dates_in_name"), "metadata": short_meta(mongo_doc.get("content") or {}) if with_metadata else None, }) return { "query": query, "filters": {"study": studies, "ext": exts, "since": since, "folder": folder, "limit": limit}, "count": len(results), "results": results, "tip": "Use read_document(path=...) to fetch full body of any hit.", } except Exception as e: log(traceback.format_exc()) return {"error": str(e), "query": query} @mcp.tool() def read_document( path: Optional[str] = None, mongo_id: Optional[str] = None, offset: int = 0, length: int = DEFAULT_BODY_CHARS, around_match: Optional[str] = None, ) -> dict: """Read the full parsed text of one document (PG body column) + its Mongo metadata. Identify the document by EITHER `path` (absolute) OR `mongo_id`. offset, length: slice the body (default first 8000 chars). length capped at 200000. around_match: if given, return up to 3 windows of ~1000 chars centered on the first matches of this substring (case-insensitive). Useful to jump to a keyword in a long doc. Body is truncated to fit; check `body_length` vs returned length to know if more exists. Use offset to page further (offset=8000, then 16000, ...). """ try: if not path and not mongo_id: return {"error": "Provide either path or mongo_id."} length = min(max(1, length), MAX_BODY_CHARS) sql = """ SELECT id, mongo_id, study, path, rel_path, name, ext, sha256, size_bytes, mtime, body, body_length, extractor_version, extracted_at, ok, error FROM documents WHERE """ + ("path = %s" if path else "mongo_id = %s") + " LIMIT 1" with pg_conn() as pg, pg.cursor() as cur: cur.execute(sql, (path or mongo_id,)) row = cur.fetchone() cols = [c.name for c in cur.description] if not row: return {"error": "Document not found.", "path": path, "mongo_id": mongo_id} rec = dict(zip(cols, row)) body = rec.get("body") or "" if around_match and body: needle = around_match.lower() hay = body.lower() windows = [] start = 0 while len(windows) < 3: pos = hay.find(needle, start) if pos < 0: break lo = max(0, pos - 400) hi = min(len(body), pos + 600) windows.append({"offset": lo, "text": body[lo:hi]}) start = pos + len(needle) body_out = None slice_info = {"mode": "around_match", "match": around_match, "windows": windows, "windows_found": len(windows)} else: body_out = body[offset:offset + length] slice_info = { "mode": "slice", "offset": offset, "length_returned": len(body_out), "has_more": offset + length < len(body), "next_offset": offset + length if offset + length < len(body) else None, } # Mongo metadata col_code = rec["study"] mdoc = mongo[MONGO_DB][col_code].find_one( {"path": rec["path"]}, {"content": 1, "dates_in_name": 1, "parent_folders": 1, "tokens": 1}, ) or {} out = { "study": rec["study"], "path": rec["path"], "rel_path": rec["rel_path"], "name": rec["name"], "ext": rec["ext"], "size_mb": round((rec["size_bytes"] or 0) / 1024 / 1024, 2), "mtime": serialize(rec["mtime"]), "sha256": rec["sha256"], "body_length": rec["body_length"], "extractor_version": rec["extractor_version"], "extracted_at": serialize(rec["extracted_at"]), "ok": rec["ok"], "error": rec["error"], "parent_folders": mdoc.get("parent_folders"), "dates_in_name": mdoc.get("dates_in_name"), "metadata": short_meta(mdoc.get("content") or {}), } if body_out is not None: out["body"] = body_out out["slice"] = slice_info return out except Exception as e: log(traceback.format_exc()) return {"error": str(e)} @mcp.tool() def get_metadata(path: str) -> dict: """Return raw Mongo document for one path (full content.*, parent_folders, dates_in_name, sha256, sizes, timestamps, tokens). Use when you need the full structured metadata — e.g. all sheet names of an XLSX, all attachments of an email, full author info. Does NOT return body text — use `read_document` for that. """ try: for code in STUDY_ALL: d = mongo[MONGO_DB][code].find_one({"path": path}) if d: return serialize(d) return {"error": "Not found in any study collection.", "path": path} except Exception as e: log(traceback.format_exc()) return {"error": str(e)} @mcp.tool() def recent_files( study: Optional[Union[str, list]] = None, days: int = 7, ext: Optional[Union[str, list]] = None, limit: int = 30, ) -> dict: """List most recently modified files (no fulltext involved). Use for "what changed lately" or "what did I get this week" questions. days: window from now (default 7). Set to 0 for no time filter (just top-N newest). """ try: studies = resolve_studies(study) or STUDY_ALL exts = normalize_exts(ext) limit = min(max(1, limit), 200) q: dict = {"deleted_at": {"$exists": False}} if exts: q["ext"] = {"$in": exts} if days and days > 0: since_dt = datetime.now(timezone.utc) - timedelta(days=days) q["mtime"] = {"$gte": since_dt} results = [] for code in studies: for d in (mongo[MONGO_DB][code] .find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1, "size_bytes": 1, "mtime": 1, "study": 1, "content.author": 1, "content.title": 1, "content.last_modified_by": 1}) .sort("mtime", -1).limit(limit)): results.append({ "study": d.get("study"), "path": d["path"], "rel_path": d.get("rel_path"), "name": d.get("name"), "ext": d.get("ext"), "size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2), "mtime": serialize(d.get("mtime")), "author": (d.get("content") or {}).get("author"), "title": (d.get("content") or {}).get("title"), "last_modified_by": (d.get("content") or {}).get("last_modified_by"), }) results.sort(key=lambda r: r["mtime"] or "", reverse=True) return {"days": days, "count": len(results[:limit]), "results": results[:limit]} except Exception as e: log(traceback.format_exc()) return {"error": str(e)} @mcp.tool() def find_duplicates( study: Optional[Union[str, list]] = None, min_size_kb: int = 10, limit: int = 30, ) -> dict: """Find groups of files with identical content (same sha256) but at different paths. Reveals copies of the same document scattered across folders / studies. min_size_kb: ignore tiny duplicate groups (default 10 KB) limit: max duplicate groups returned """ try: studies = resolve_studies(study) or STUDY_ALL pipeline = [ {"$match": {"deleted_at": {"$exists": False}, "size_bytes": {"$gte": min_size_kb * 1024}}}, {"$group": {"_id": "$sha256", "count": {"$sum": 1}, "size_bytes": {"$first": "$size_bytes"}, "ext": {"$first": "$ext"}, "paths": {"$push": {"study": "$study", "path": "$path", "rel_path": "$rel_path", "mtime": "$mtime"}}}}, {"$match": {"count": {"$gte": 2}}}, {"$sort": {"size_bytes": -1, "count": -1}}, {"$limit": limit}, ] all_groups: dict = {} for code in studies: for g in mongo[MONGO_DB][code].aggregate(pipeline): sha = g["_id"] if sha in all_groups: all_groups[sha]["count"] += g["count"] all_groups[sha]["paths"].extend(g["paths"]) else: all_groups[sha] = { "sha256": sha, "count": g["count"], "ext": g["ext"], "size_mb": round(g["size_bytes"] / 1024 / 1024, 2), "paths": g["paths"], } groups = sorted(all_groups.values(), key=lambda x: (x["size_mb"], x["count"]), reverse=True)[:limit] for g in groups: for p in g["paths"]: p["mtime"] = serialize(p.get("mtime")) return { "filters": {"study": studies, "min_size_kb": min_size_kb}, "group_count": len(groups), "wasted_mb_estimate": round(sum(g["size_mb"] * (g["count"] - 1) for g in groups), 2), "groups": groups, } except Exception as e: log(traceback.format_exc()) return {"error": str(e)} @mcp.tool() def by_author( name: str, study: Optional[Union[str, list]] = None, ext: Optional[Union[str, list]] = None, limit: int = 30, ) -> dict: """Find documents where content.author OR content.last_modified_by matches `name` (case-insensitive substring). Works for DOCX/XLSX/PPTX/PDF embedded metadata. Use for "what did X write" or "who edited this". """ try: studies = resolve_studies(study) or STUDY_ALL exts = normalize_exts(ext) limit = min(max(1, limit), 200) rx = {"$regex": name, "$options": "i"} q: dict = {"deleted_at": {"$exists": False}, "$or": [{"content.author": rx}, {"content.last_modified_by": rx}]} if exts: q["ext"] = {"$in": exts} results = [] for code in studies: for d in (mongo[MONGO_DB][code] .find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1, "size_bytes": 1, "mtime": 1, "study": 1, "content": 1}) .sort("mtime", -1).limit(limit)): c = d.get("content") or {} results.append({ "study": d.get("study"), "path": d["path"], "rel_path": d.get("rel_path"), "name": d.get("name"), "ext": d.get("ext"), "mtime": serialize(d.get("mtime")), "size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2), "author": c.get("author"), "last_modified_by": c.get("last_modified_by"), "title": c.get("title"), }) results.sort(key=lambda r: r["mtime"] or "", reverse=True) return {"author_match": name, "count": len(results[:limit]), "results": results[:limit]} except Exception as e: log(traceback.format_exc()) return {"error": str(e)} @mcp.tool() def browse_folder( folder: str, study: Optional[Union[str, list]] = None, ext: Optional[Union[str, list]] = None, limit: int = 100, ) -> dict: """List files where any parent folder name contains `folder` (case-insensitive substring match). Use for "show me what's in the CRF folder" or "what's in Training". Returns just metadata, no body text. Files sorted by relative path. """ try: studies = resolve_studies(study) or STUDY_ALL exts = normalize_exts(ext) limit = min(max(1, limit), 500) rx = {"$regex": folder, "$options": "i"} q: dict = {"deleted_at": {"$exists": False}, "parent_folders": rx} if exts: q["ext"] = {"$in": exts} results = [] for code in studies: for d in (mongo[MONGO_DB][code] .find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1, "size_bytes": 1, "mtime": 1, "study": 1, "parent_folders": 1, "dates_in_name": 1}) .sort("rel_path", 1).limit(limit)): results.append({ "study": d.get("study"), "path": d["path"], "rel_path": d.get("rel_path"), "name": d.get("name"), "ext": d.get("ext"), "mtime": serialize(d.get("mtime")), "size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2), "parent_folders": d.get("parent_folders"), "dates_in_name": d.get("dates_in_name"), }) return {"folder_match": folder, "count": len(results), "results": results} except Exception as e: log(traceback.format_exc()) return {"error": str(e)} if __name__ == "__main__": log("MCP soubory server started (FastMCP)") mcp.run()