Files
2026-06-05 21:21:30 +02:00

673 lines
25 KiB
Python

#!/usr/bin/env python3
"""
==============================================================================
MCP server: SOUBORY (Dropbox studie 42847922MDD3003 + 77242113UCO3001)
Hybridni dotaz nad:
- PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents
(fulltext tsvector index, ts_headline, ts_rank)
- MongoDB 192.168.1.76 db=soubory
kolekce=42847922MDD3003, 77242113UCO3001
(metadata, content.* z enrich_files_v1.0)
Spusteni:
python mcp_soubory.py (stdio MCP)
Pridano do U:\\janssen\\.mcp.json jako "soubory".
==============================================================================
"""
from __future__ import annotations
import sys
import traceback
from datetime import datetime, timezone, timedelta
from typing import Optional, Union
import psycopg
from bson import ObjectId
from mcp.server.fastmcp import FastMCP
from pymongo import MongoClient
# --- konfigurace ------------------------------------------------------------
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "soubory"
# Kratky alias -> Mongo kolekce = PG.study
STUDY_MAP = {
"MDD3003": "42847922MDD3003",
"UCO3001": "77242113UCO3001",
}
STUDY_ALL = list(STUDY_MAP.values())
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory "
"user=vladimir.buzalka password=Vlado7309208104++")
# Limit kolik telo doc vracime defaultne (aby tool response nebyla obri)
DEFAULT_BODY_CHARS = 8000
MAX_BODY_CHARS = 200_000
def log(msg: str) -> None:
print(msg, file=sys.stderr, flush=True)
# --- inicializace klientu ---------------------------------------------------
try:
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
log(f"Mongo OK ({MONGO_URI})")
except Exception as e:
log(f"Mongo connection failed: {e}")
sys.exit(1)
try:
_test = psycopg.connect(PG_DSN, connect_timeout=10)
_test.close()
log("Postgres OK")
except Exception as e:
log(f"Postgres connection failed: {e}")
sys.exit(1)
def pg_conn():
return psycopg.connect(PG_DSN, connect_timeout=10)
def serialize(obj):
if isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
if isinstance(obj, dict):
return {k: serialize(v) for k, v in obj.items()}
if isinstance(obj, list):
return [serialize(v) for v in obj]
return obj
def resolve_studies(study: Optional[Union[str, list]]) -> Optional[list[str]]:
"""Alias 'MDD3003' / 'UCO3001' -> plne nazvy kolekce. None -> obe (vraci None pro PG = bez filtru)."""
if study is None or study == "" or study == []:
return None
if isinstance(study, str):
study = [study]
out = []
for s in study:
if s in STUDY_MAP:
out.append(STUDY_MAP[s])
elif s in STUDY_MAP.values():
out.append(s)
else:
raise ValueError(f"Unknown study {s!r}. Use MDD3003 / UCO3001 or full code.")
return out
def normalize_exts(ext: Optional[Union[str, list]]) -> Optional[list[str]]:
if ext is None or ext == "" or ext == []:
return None
if isinstance(ext, str):
ext = [ext]
return [e.lower().lstrip(".") for e in ext]
def parse_since(since: Optional[str]) -> Optional[datetime]:
if not since:
return None
# akceptuj YYYY-MM-DD i ISO
try:
if "T" in since:
return datetime.fromisoformat(since.replace("Z", "+00:00"))
return datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
except Exception as e:
raise ValueError(f"Bad date {since!r}: {e}")
def short_meta(content: dict) -> dict:
"""Zhustene metadata z content.* pro tool response."""
if not content or not content.get("ok", True):
return {"ok": False, "error": (content or {}).get("error")}
out = {}
for k in ("title", "subject", "author", "last_modified_by",
"from", "to", "cc", "date", "pages", "slides",
"total_sheets", "paragraphs", "words",
"created", "modified", "encrypted"):
if k in content and content[k] not in (None, "", []):
v = content[k]
if isinstance(v, str) and len(v) > 200:
v = v[:200] + "..."
out[k] = v
if "sheets" in content:
out["sheet_names"] = [s.get("name") for s in content.get("sheets", []) if s]
if "attachments" in content:
out["attachment_count"] = len(content.get("attachments") or [])
if out["attachment_count"]:
out["attachments"] = content["attachments"][:10]
if "text_head" in content:
head = content["text_head"]
out["text_head"] = head[:400] + ("..." if head and len(head) > 400 else "")
return out
# --- MCP --------------------------------------------------------------------
mcp = FastMCP("soubory")
@mcp.tool()
def ping() -> dict:
"""Quick health check. Reports Mongo + Postgres connectivity, totals per study, and PG documents.ok count.
Call this first when starting an investigation to confirm everything is up.
"""
try:
info = mongo.admin.command("buildInfo")
study_counts = {}
for code in STUDY_ALL:
study_counts[code] = mongo[MONGO_DB][code].estimated_document_count()
with pg_conn() as pg, pg.cursor() as cur:
cur.execute("SELECT study, ok, count(*) FROM documents GROUP BY study, ok ORDER BY study, ok")
rows = cur.fetchall()
pg_summary = {}
for s, ok, c in rows:
pg_summary.setdefault(s, {})[("ok" if ok else "error")] = c
return {
"status": "ok",
"mongo_version": info.get("version"),
"mongo_files_per_study": study_counts,
"pg_documents_per_study": pg_summary,
"studies": STUDY_MAP,
}
except Exception as e:
log(traceback.format_exc())
return {"status": "error", "error": str(e)}
@mcp.tool()
def list_studies() -> dict:
"""Overview of both studies — total files, breakdown by extension, fulltext coverage,
earliest/latest mtime. Use this to understand the corpus before searching.
"""
out = {}
try:
for alias, code in STUDY_MAP.items():
col = mongo[MONGO_DB][code]
total = col.count_documents({})
deleted = col.count_documents({"deleted_at": {"$exists": True}})
ext_breakdown = list(col.aggregate([
{"$match": {"deleted_at": {"$exists": False}}},
{"$group": {"_id": "$ext", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]))
mtime_minmax = list(col.aggregate([
{"$match": {"deleted_at": {"$exists": False}}},
{"$group": {"_id": None,
"min_mtime": {"$min": "$mtime"},
"max_mtime": {"$max": "$mtime"}}},
]))
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(
"SELECT count(*) FILTER (WHERE ok), count(*) FROM documents WHERE study=%s",
(code,),
)
pg_ok, pg_total = cur.fetchone()
out[alias] = {
"code": code,
"mongo_total": total,
"mongo_active": total - deleted,
"mongo_deleted": deleted,
"by_ext": {r["_id"]: r["count"] for r in ext_breakdown},
"fulltext_indexed": pg_ok,
"fulltext_failed": pg_total - pg_ok,
"oldest_mtime": serialize(mtime_minmax[0]["min_mtime"]) if mtime_minmax else None,
"newest_mtime": serialize(mtime_minmax[0]["max_mtime"]) if mtime_minmax else None,
}
return {"studies": out}
except Exception as e:
log(traceback.format_exc())
raise
@mcp.tool()
def search(
query: str,
study: Optional[Union[str, list]] = None,
ext: Optional[Union[str, list]] = None,
since: Optional[str] = None,
folder: Optional[str] = None,
limit: int = 15,
with_metadata: bool = True,
) -> dict:
"""PRIMARY TOOL — fulltext search across all parsed documents in both studies.
query: search expression in PostgreSQL websearch_to_tsquery syntax:
adverse event -> AND (both must appear)
"adverse event" -> exact phrase
adverse OR serious -> OR
adverse -mild -> exclude
study: "MDD3003", "UCO3001", or list. None = both.
ext: filter file types: ["pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv"]
since: ISO date "YYYY-MM-DD" — only files modified on/after this date
folder: substring match against any parent folder name (e.g. "CRF", "Training")
limit: max results (default 15, max 100)
with_metadata: if True, also fetch content.* metadata from Mongo (author, pages, sheets, EML headers)
Returns ranked results with `snippet` showing matches highlighted with <<...>>.
Use `read_document` to fetch full body of a specific hit.
"""
try:
studies = resolve_studies(study)
exts = normalize_exts(ext)
since_dt = parse_since(since)
limit = min(max(1, limit), 100)
sql = """
WITH q AS (
SELECT websearch_to_tsquery('soubory'::regconfig, %(query)s) AS tsq
)
SELECT
d.id, d.mongo_id, d.study, d.path, d.rel_path, d.name, d.ext,
d.size_bytes, d.mtime, d.body_length,
ts_rank(d.tsv, q.tsq) AS rank,
ts_headline('soubory'::regconfig,
left(d.body, 200000),
q.tsq,
'MaxFragments=3, MinWords=4, MaxWords=18, '
'StartSel=<<, StopSel=>>, FragmentDelimiter= ... ') AS snippet
FROM documents d, q
WHERE d.tsv @@ q.tsq
AND d.ok = TRUE
AND (%(studies)s::text[] IS NULL OR d.study = ANY(%(studies)s::text[]))
AND (%(exts)s::text[] IS NULL OR d.ext = ANY(%(exts)s::text[]))
AND (%(since)s::timestamptz IS NULL OR d.mtime >= %(since)s::timestamptz)
ORDER BY rank DESC, d.mtime DESC NULLS LAST
LIMIT %(limit)s
"""
params = {"query": query, "studies": studies, "exts": exts,
"since": since_dt, "limit": limit}
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, params)
cols = [c.name for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
# filter by folder via Mongo (PG nema parent_folders)
meta_by_path: dict[str, dict] = {}
if rows and (with_metadata or folder):
by_study: dict[str, list[str]] = {}
for r in rows:
by_study.setdefault(r["study"], []).append(r["path"])
for code, paths in by_study.items():
proj = {"path": 1, "parent_folders": 1, "dates_in_name": 1}
if with_metadata:
proj["content"] = 1
for d in mongo[MONGO_DB][code].find({"path": {"$in": paths}}, proj):
meta_by_path[d["path"]] = d
if folder:
needle = folder.lower()
kept = []
for r in rows:
folders = (meta_by_path.get(r["path"]) or {}).get("parent_folders") or []
if any(needle in (f or "").lower() for f in folders):
kept.append(r)
rows = kept
results = []
for r in rows:
mongo_doc = meta_by_path.get(r["path"]) or {}
results.append({
"study": r["study"],
"path": r["path"],
"rel_path": r["rel_path"],
"name": r["name"],
"ext": r["ext"],
"size_mb": round((r["size_bytes"] or 0) / 1024 / 1024, 2),
"mtime": serialize(r["mtime"]),
"body_length": r["body_length"],
"rank": round(float(r["rank"]), 5),
"snippet": (r["snippet"] or "").strip(),
"mongo_id": r["mongo_id"],
"dates_in_name": mongo_doc.get("dates_in_name"),
"metadata": short_meta(mongo_doc.get("content") or {}) if with_metadata else None,
})
return {
"query": query,
"filters": {"study": studies, "ext": exts, "since": since,
"folder": folder, "limit": limit},
"count": len(results),
"results": results,
"tip": "Use read_document(path=...) to fetch full body of any hit.",
}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e), "query": query}
@mcp.tool()
def read_document(
path: Optional[str] = None,
mongo_id: Optional[str] = None,
offset: int = 0,
length: int = DEFAULT_BODY_CHARS,
around_match: Optional[str] = None,
) -> dict:
"""Read the full parsed text of one document (PG body column) + its Mongo metadata.
Identify the document by EITHER `path` (absolute) OR `mongo_id`.
offset, length: slice the body (default first 8000 chars). length capped at 200000.
around_match: if given, return up to 3 windows of ~1000 chars centered on the first matches
of this substring (case-insensitive). Useful to jump to a keyword in a long doc.
Body is truncated to fit; check `body_length` vs returned length to know if more exists.
Use offset to page further (offset=8000, then 16000, ...).
"""
try:
if not path and not mongo_id:
return {"error": "Provide either path or mongo_id."}
length = min(max(1, length), MAX_BODY_CHARS)
sql = """
SELECT id, mongo_id, study, path, rel_path, name, ext, sha256,
size_bytes, mtime, body, body_length, extractor_version,
extracted_at, ok, error
FROM documents
WHERE """ + ("path = %s" if path else "mongo_id = %s") + " LIMIT 1"
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, (path or mongo_id,))
row = cur.fetchone()
cols = [c.name for c in cur.description]
if not row:
return {"error": "Document not found.", "path": path, "mongo_id": mongo_id}
rec = dict(zip(cols, row))
body = rec.get("body") or ""
if around_match and body:
needle = around_match.lower()
hay = body.lower()
windows = []
start = 0
while len(windows) < 3:
pos = hay.find(needle, start)
if pos < 0:
break
lo = max(0, pos - 400)
hi = min(len(body), pos + 600)
windows.append({"offset": lo, "text": body[lo:hi]})
start = pos + len(needle)
body_out = None
slice_info = {"mode": "around_match", "match": around_match,
"windows": windows, "windows_found": len(windows)}
else:
body_out = body[offset:offset + length]
slice_info = {
"mode": "slice", "offset": offset,
"length_returned": len(body_out),
"has_more": offset + length < len(body),
"next_offset": offset + length if offset + length < len(body) else None,
}
# Mongo metadata
col_code = rec["study"]
mdoc = mongo[MONGO_DB][col_code].find_one(
{"path": rec["path"]},
{"content": 1, "dates_in_name": 1, "parent_folders": 1, "tokens": 1},
) or {}
out = {
"study": rec["study"],
"path": rec["path"],
"rel_path": rec["rel_path"],
"name": rec["name"],
"ext": rec["ext"],
"size_mb": round((rec["size_bytes"] or 0) / 1024 / 1024, 2),
"mtime": serialize(rec["mtime"]),
"sha256": rec["sha256"],
"body_length": rec["body_length"],
"extractor_version": rec["extractor_version"],
"extracted_at": serialize(rec["extracted_at"]),
"ok": rec["ok"],
"error": rec["error"],
"parent_folders": mdoc.get("parent_folders"),
"dates_in_name": mdoc.get("dates_in_name"),
"metadata": short_meta(mdoc.get("content") or {}),
}
if body_out is not None:
out["body"] = body_out
out["slice"] = slice_info
return out
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def get_metadata(path: str) -> dict:
"""Return raw Mongo document for one path (full content.*, parent_folders, dates_in_name,
sha256, sizes, timestamps, tokens). Use when you need the full structured metadata —
e.g. all sheet names of an XLSX, all attachments of an email, full author info.
Does NOT return body text — use `read_document` for that.
"""
try:
for code in STUDY_ALL:
d = mongo[MONGO_DB][code].find_one({"path": path})
if d:
return serialize(d)
return {"error": "Not found in any study collection.", "path": path}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def recent_files(
study: Optional[Union[str, list]] = None,
days: int = 7,
ext: Optional[Union[str, list]] = None,
limit: int = 30,
) -> dict:
"""List most recently modified files (no fulltext involved). Use for "what changed lately"
or "what did I get this week" questions.
days: window from now (default 7). Set to 0 for no time filter (just top-N newest).
"""
try:
studies = resolve_studies(study) or STUDY_ALL
exts = normalize_exts(ext)
limit = min(max(1, limit), 200)
q: dict = {"deleted_at": {"$exists": False}}
if exts:
q["ext"] = {"$in": exts}
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
q["mtime"] = {"$gte": since_dt}
results = []
for code in studies:
for d in (mongo[MONGO_DB][code]
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
"size_bytes": 1, "mtime": 1, "study": 1,
"content.author": 1, "content.title": 1,
"content.last_modified_by": 1})
.sort("mtime", -1).limit(limit)):
results.append({
"study": d.get("study"),
"path": d["path"],
"rel_path": d.get("rel_path"),
"name": d.get("name"),
"ext": d.get("ext"),
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
"mtime": serialize(d.get("mtime")),
"author": (d.get("content") or {}).get("author"),
"title": (d.get("content") or {}).get("title"),
"last_modified_by": (d.get("content") or {}).get("last_modified_by"),
})
results.sort(key=lambda r: r["mtime"] or "", reverse=True)
return {"days": days, "count": len(results[:limit]), "results": results[:limit]}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def find_duplicates(
study: Optional[Union[str, list]] = None,
min_size_kb: int = 10,
limit: int = 30,
) -> dict:
"""Find groups of files with identical content (same sha256) but at different paths.
Reveals copies of the same document scattered across folders / studies.
min_size_kb: ignore tiny duplicate groups (default 10 KB)
limit: max duplicate groups returned
"""
try:
studies = resolve_studies(study) or STUDY_ALL
pipeline = [
{"$match": {"deleted_at": {"$exists": False},
"size_bytes": {"$gte": min_size_kb * 1024}}},
{"$group": {"_id": "$sha256",
"count": {"$sum": 1},
"size_bytes": {"$first": "$size_bytes"},
"ext": {"$first": "$ext"},
"paths": {"$push": {"study": "$study",
"path": "$path",
"rel_path": "$rel_path",
"mtime": "$mtime"}}}},
{"$match": {"count": {"$gte": 2}}},
{"$sort": {"size_bytes": -1, "count": -1}},
{"$limit": limit},
]
all_groups: dict = {}
for code in studies:
for g in mongo[MONGO_DB][code].aggregate(pipeline):
sha = g["_id"]
if sha in all_groups:
all_groups[sha]["count"] += g["count"]
all_groups[sha]["paths"].extend(g["paths"])
else:
all_groups[sha] = {
"sha256": sha, "count": g["count"], "ext": g["ext"],
"size_mb": round(g["size_bytes"] / 1024 / 1024, 2),
"paths": g["paths"],
}
groups = sorted(all_groups.values(),
key=lambda x: (x["size_mb"], x["count"]), reverse=True)[:limit]
for g in groups:
for p in g["paths"]:
p["mtime"] = serialize(p.get("mtime"))
return {
"filters": {"study": studies, "min_size_kb": min_size_kb},
"group_count": len(groups),
"wasted_mb_estimate": round(sum(g["size_mb"] * (g["count"] - 1) for g in groups), 2),
"groups": groups,
}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def by_author(
name: str,
study: Optional[Union[str, list]] = None,
ext: Optional[Union[str, list]] = None,
limit: int = 30,
) -> dict:
"""Find documents where content.author OR content.last_modified_by matches `name` (case-insensitive substring).
Works for DOCX/XLSX/PPTX/PDF embedded metadata. Use for "what did X write" or "who edited this".
"""
try:
studies = resolve_studies(study) or STUDY_ALL
exts = normalize_exts(ext)
limit = min(max(1, limit), 200)
rx = {"$regex": name, "$options": "i"}
q: dict = {"deleted_at": {"$exists": False},
"$or": [{"content.author": rx},
{"content.last_modified_by": rx}]}
if exts:
q["ext"] = {"$in": exts}
results = []
for code in studies:
for d in (mongo[MONGO_DB][code]
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
"size_bytes": 1, "mtime": 1, "study": 1, "content": 1})
.sort("mtime", -1).limit(limit)):
c = d.get("content") or {}
results.append({
"study": d.get("study"),
"path": d["path"],
"rel_path": d.get("rel_path"),
"name": d.get("name"),
"ext": d.get("ext"),
"mtime": serialize(d.get("mtime")),
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
"author": c.get("author"),
"last_modified_by": c.get("last_modified_by"),
"title": c.get("title"),
})
results.sort(key=lambda r: r["mtime"] or "", reverse=True)
return {"author_match": name, "count": len(results[:limit]), "results": results[:limit]}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
@mcp.tool()
def browse_folder(
folder: str,
study: Optional[Union[str, list]] = None,
ext: Optional[Union[str, list]] = None,
limit: int = 100,
) -> dict:
"""List files where any parent folder name contains `folder` (case-insensitive substring match).
Use for "show me what's in the CRF folder" or "what's in Training". Returns just metadata,
no body text. Files sorted by relative path.
"""
try:
studies = resolve_studies(study) or STUDY_ALL
exts = normalize_exts(ext)
limit = min(max(1, limit), 500)
rx = {"$regex": folder, "$options": "i"}
q: dict = {"deleted_at": {"$exists": False}, "parent_folders": rx}
if exts:
q["ext"] = {"$in": exts}
results = []
for code in studies:
for d in (mongo[MONGO_DB][code]
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
"size_bytes": 1, "mtime": 1, "study": 1,
"parent_folders": 1, "dates_in_name": 1})
.sort("rel_path", 1).limit(limit)):
results.append({
"study": d.get("study"),
"path": d["path"],
"rel_path": d.get("rel_path"),
"name": d.get("name"),
"ext": d.get("ext"),
"mtime": serialize(d.get("mtime")),
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
"parent_folders": d.get("parent_folders"),
"dates_in_name": d.get("dates_in_name"),
})
return {"folder_match": folder, "count": len(results), "results": results}
except Exception as e:
log(traceback.format_exc())
return {"error": str(e)}
if __name__ == "__main__":
log("MCP soubory server started (FastMCP)")
mcp.run()