673 lines
25 KiB
Python
673 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
==============================================================================
|
|
MCP server: SOUBORY (Dropbox studie 42847922MDD3003 + 77242113UCO3001)
|
|
|
|
Hybridni dotaz nad:
|
|
- PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents
|
|
(fulltext tsvector index, ts_headline, ts_rank)
|
|
- MongoDB 192.168.1.76 db=soubory
|
|
kolekce=42847922MDD3003, 77242113UCO3001
|
|
(metadata, content.* z enrich_files_v1.0)
|
|
|
|
Spusteni:
|
|
python mcp_soubory.py (stdio MCP)
|
|
|
|
Pridano do U:\\janssen\\.mcp.json jako "soubory".
|
|
==============================================================================
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, Union
|
|
|
|
import psycopg
|
|
from bson import ObjectId
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pymongo import MongoClient
|
|
|
|
# --- konfigurace ------------------------------------------------------------
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "soubory"
|
|
|
|
# Kratky alias -> Mongo kolekce = PG.study
|
|
STUDY_MAP = {
|
|
"MDD3003": "42847922MDD3003",
|
|
"UCO3001": "77242113UCO3001",
|
|
}
|
|
STUDY_ALL = list(STUDY_MAP.values())
|
|
|
|
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory "
|
|
"user=vladimir.buzalka password=Vlado7309208104++")
|
|
|
|
# Limit kolik telo doc vracime defaultne (aby tool response nebyla obri)
|
|
DEFAULT_BODY_CHARS = 8000
|
|
MAX_BODY_CHARS = 200_000
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
# --- inicializace klientu ---------------------------------------------------
|
|
try:
|
|
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
mongo.admin.command("ping")
|
|
log(f"Mongo OK ({MONGO_URI})")
|
|
except Exception as e:
|
|
log(f"Mongo connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
_test = psycopg.connect(PG_DSN, connect_timeout=10)
|
|
_test.close()
|
|
log("Postgres OK")
|
|
except Exception as e:
|
|
log(f"Postgres connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def pg_conn():
|
|
return psycopg.connect(PG_DSN, connect_timeout=10)
|
|
|
|
|
|
def serialize(obj):
|
|
if isinstance(obj, ObjectId):
|
|
return str(obj)
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
if isinstance(obj, bytes):
|
|
return obj.decode("utf-8", errors="replace")
|
|
if isinstance(obj, dict):
|
|
return {k: serialize(v) for k, v in obj.items()}
|
|
if isinstance(obj, list):
|
|
return [serialize(v) for v in obj]
|
|
return obj
|
|
|
|
|
|
def resolve_studies(study: Optional[Union[str, list]]) -> Optional[list[str]]:
|
|
"""Alias 'MDD3003' / 'UCO3001' -> plne nazvy kolekce. None -> obe (vraci None pro PG = bez filtru)."""
|
|
if study is None or study == "" or study == []:
|
|
return None
|
|
if isinstance(study, str):
|
|
study = [study]
|
|
out = []
|
|
for s in study:
|
|
if s in STUDY_MAP:
|
|
out.append(STUDY_MAP[s])
|
|
elif s in STUDY_MAP.values():
|
|
out.append(s)
|
|
else:
|
|
raise ValueError(f"Unknown study {s!r}. Use MDD3003 / UCO3001 or full code.")
|
|
return out
|
|
|
|
|
|
def normalize_exts(ext: Optional[Union[str, list]]) -> Optional[list[str]]:
|
|
if ext is None or ext == "" or ext == []:
|
|
return None
|
|
if isinstance(ext, str):
|
|
ext = [ext]
|
|
return [e.lower().lstrip(".") for e in ext]
|
|
|
|
|
|
def parse_since(since: Optional[str]) -> Optional[datetime]:
|
|
if not since:
|
|
return None
|
|
# akceptuj YYYY-MM-DD i ISO
|
|
try:
|
|
if "T" in since:
|
|
return datetime.fromisoformat(since.replace("Z", "+00:00"))
|
|
return datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
|
except Exception as e:
|
|
raise ValueError(f"Bad date {since!r}: {e}")
|
|
|
|
|
|
def short_meta(content: dict) -> dict:
|
|
"""Zhustene metadata z content.* pro tool response."""
|
|
if not content or not content.get("ok", True):
|
|
return {"ok": False, "error": (content or {}).get("error")}
|
|
out = {}
|
|
for k in ("title", "subject", "author", "last_modified_by",
|
|
"from", "to", "cc", "date", "pages", "slides",
|
|
"total_sheets", "paragraphs", "words",
|
|
"created", "modified", "encrypted"):
|
|
if k in content and content[k] not in (None, "", []):
|
|
v = content[k]
|
|
if isinstance(v, str) and len(v) > 200:
|
|
v = v[:200] + "..."
|
|
out[k] = v
|
|
if "sheets" in content:
|
|
out["sheet_names"] = [s.get("name") for s in content.get("sheets", []) if s]
|
|
if "attachments" in content:
|
|
out["attachment_count"] = len(content.get("attachments") or [])
|
|
if out["attachment_count"]:
|
|
out["attachments"] = content["attachments"][:10]
|
|
if "text_head" in content:
|
|
head = content["text_head"]
|
|
out["text_head"] = head[:400] + ("..." if head and len(head) > 400 else "")
|
|
return out
|
|
|
|
|
|
# --- MCP --------------------------------------------------------------------
|
|
mcp = FastMCP("soubory")
|
|
|
|
|
|
@mcp.tool()
|
|
def ping() -> dict:
|
|
"""Quick health check. Reports Mongo + Postgres connectivity, totals per study, and PG documents.ok count.
|
|
Call this first when starting an investigation to confirm everything is up.
|
|
"""
|
|
try:
|
|
info = mongo.admin.command("buildInfo")
|
|
study_counts = {}
|
|
for code in STUDY_ALL:
|
|
study_counts[code] = mongo[MONGO_DB][code].estimated_document_count()
|
|
with pg_conn() as pg, pg.cursor() as cur:
|
|
cur.execute("SELECT study, ok, count(*) FROM documents GROUP BY study, ok ORDER BY study, ok")
|
|
rows = cur.fetchall()
|
|
pg_summary = {}
|
|
for s, ok, c in rows:
|
|
pg_summary.setdefault(s, {})[("ok" if ok else "error")] = c
|
|
return {
|
|
"status": "ok",
|
|
"mongo_version": info.get("version"),
|
|
"mongo_files_per_study": study_counts,
|
|
"pg_documents_per_study": pg_summary,
|
|
"studies": STUDY_MAP,
|
|
}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_studies() -> dict:
|
|
"""Overview of both studies — total files, breakdown by extension, fulltext coverage,
|
|
earliest/latest mtime. Use this to understand the corpus before searching.
|
|
"""
|
|
out = {}
|
|
try:
|
|
for alias, code in STUDY_MAP.items():
|
|
col = mongo[MONGO_DB][code]
|
|
total = col.count_documents({})
|
|
deleted = col.count_documents({"deleted_at": {"$exists": True}})
|
|
ext_breakdown = list(col.aggregate([
|
|
{"$match": {"deleted_at": {"$exists": False}}},
|
|
{"$group": {"_id": "$ext", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
]))
|
|
mtime_minmax = list(col.aggregate([
|
|
{"$match": {"deleted_at": {"$exists": False}}},
|
|
{"$group": {"_id": None,
|
|
"min_mtime": {"$min": "$mtime"},
|
|
"max_mtime": {"$max": "$mtime"}}},
|
|
]))
|
|
with pg_conn() as pg, pg.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT count(*) FILTER (WHERE ok), count(*) FROM documents WHERE study=%s",
|
|
(code,),
|
|
)
|
|
pg_ok, pg_total = cur.fetchone()
|
|
out[alias] = {
|
|
"code": code,
|
|
"mongo_total": total,
|
|
"mongo_active": total - deleted,
|
|
"mongo_deleted": deleted,
|
|
"by_ext": {r["_id"]: r["count"] for r in ext_breakdown},
|
|
"fulltext_indexed": pg_ok,
|
|
"fulltext_failed": pg_total - pg_ok,
|
|
"oldest_mtime": serialize(mtime_minmax[0]["min_mtime"]) if mtime_minmax else None,
|
|
"newest_mtime": serialize(mtime_minmax[0]["max_mtime"]) if mtime_minmax else None,
|
|
}
|
|
return {"studies": out}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def search(
|
|
query: str,
|
|
study: Optional[Union[str, list]] = None,
|
|
ext: Optional[Union[str, list]] = None,
|
|
since: Optional[str] = None,
|
|
folder: Optional[str] = None,
|
|
limit: int = 15,
|
|
with_metadata: bool = True,
|
|
) -> dict:
|
|
"""PRIMARY TOOL — fulltext search across all parsed documents in both studies.
|
|
|
|
query: search expression in PostgreSQL websearch_to_tsquery syntax:
|
|
adverse event -> AND (both must appear)
|
|
"adverse event" -> exact phrase
|
|
adverse OR serious -> OR
|
|
adverse -mild -> exclude
|
|
study: "MDD3003", "UCO3001", or list. None = both.
|
|
ext: filter file types: ["pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv"]
|
|
since: ISO date "YYYY-MM-DD" — only files modified on/after this date
|
|
folder: substring match against any parent folder name (e.g. "CRF", "Training")
|
|
limit: max results (default 15, max 100)
|
|
with_metadata: if True, also fetch content.* metadata from Mongo (author, pages, sheets, EML headers)
|
|
|
|
Returns ranked results with `snippet` showing matches highlighted with <<...>>.
|
|
Use `read_document` to fetch full body of a specific hit.
|
|
"""
|
|
try:
|
|
studies = resolve_studies(study)
|
|
exts = normalize_exts(ext)
|
|
since_dt = parse_since(since)
|
|
limit = min(max(1, limit), 100)
|
|
|
|
sql = """
|
|
WITH q AS (
|
|
SELECT websearch_to_tsquery('soubory'::regconfig, %(query)s) AS tsq
|
|
)
|
|
SELECT
|
|
d.id, d.mongo_id, d.study, d.path, d.rel_path, d.name, d.ext,
|
|
d.size_bytes, d.mtime, d.body_length,
|
|
ts_rank(d.tsv, q.tsq) AS rank,
|
|
ts_headline('soubory'::regconfig,
|
|
left(d.body, 200000),
|
|
q.tsq,
|
|
'MaxFragments=3, MinWords=4, MaxWords=18, '
|
|
'StartSel=<<, StopSel=>>, FragmentDelimiter= ... ') AS snippet
|
|
FROM documents d, q
|
|
WHERE d.tsv @@ q.tsq
|
|
AND d.ok = TRUE
|
|
AND (%(studies)s::text[] IS NULL OR d.study = ANY(%(studies)s::text[]))
|
|
AND (%(exts)s::text[] IS NULL OR d.ext = ANY(%(exts)s::text[]))
|
|
AND (%(since)s::timestamptz IS NULL OR d.mtime >= %(since)s::timestamptz)
|
|
ORDER BY rank DESC, d.mtime DESC NULLS LAST
|
|
LIMIT %(limit)s
|
|
"""
|
|
params = {"query": query, "studies": studies, "exts": exts,
|
|
"since": since_dt, "limit": limit}
|
|
|
|
with pg_conn() as pg, pg.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
cols = [c.name for c in cur.description]
|
|
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
|
|
|
|
# filter by folder via Mongo (PG nema parent_folders)
|
|
meta_by_path: dict[str, dict] = {}
|
|
if rows and (with_metadata or folder):
|
|
by_study: dict[str, list[str]] = {}
|
|
for r in rows:
|
|
by_study.setdefault(r["study"], []).append(r["path"])
|
|
for code, paths in by_study.items():
|
|
proj = {"path": 1, "parent_folders": 1, "dates_in_name": 1}
|
|
if with_metadata:
|
|
proj["content"] = 1
|
|
for d in mongo[MONGO_DB][code].find({"path": {"$in": paths}}, proj):
|
|
meta_by_path[d["path"]] = d
|
|
|
|
if folder:
|
|
needle = folder.lower()
|
|
kept = []
|
|
for r in rows:
|
|
folders = (meta_by_path.get(r["path"]) or {}).get("parent_folders") or []
|
|
if any(needle in (f or "").lower() for f in folders):
|
|
kept.append(r)
|
|
rows = kept
|
|
|
|
results = []
|
|
for r in rows:
|
|
mongo_doc = meta_by_path.get(r["path"]) or {}
|
|
results.append({
|
|
"study": r["study"],
|
|
"path": r["path"],
|
|
"rel_path": r["rel_path"],
|
|
"name": r["name"],
|
|
"ext": r["ext"],
|
|
"size_mb": round((r["size_bytes"] or 0) / 1024 / 1024, 2),
|
|
"mtime": serialize(r["mtime"]),
|
|
"body_length": r["body_length"],
|
|
"rank": round(float(r["rank"]), 5),
|
|
"snippet": (r["snippet"] or "").strip(),
|
|
"mongo_id": r["mongo_id"],
|
|
"dates_in_name": mongo_doc.get("dates_in_name"),
|
|
"metadata": short_meta(mongo_doc.get("content") or {}) if with_metadata else None,
|
|
})
|
|
|
|
return {
|
|
"query": query,
|
|
"filters": {"study": studies, "ext": exts, "since": since,
|
|
"folder": folder, "limit": limit},
|
|
"count": len(results),
|
|
"results": results,
|
|
"tip": "Use read_document(path=...) to fetch full body of any hit.",
|
|
}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e), "query": query}
|
|
|
|
|
|
@mcp.tool()
|
|
def read_document(
|
|
path: Optional[str] = None,
|
|
mongo_id: Optional[str] = None,
|
|
offset: int = 0,
|
|
length: int = DEFAULT_BODY_CHARS,
|
|
around_match: Optional[str] = None,
|
|
) -> dict:
|
|
"""Read the full parsed text of one document (PG body column) + its Mongo metadata.
|
|
|
|
Identify the document by EITHER `path` (absolute) OR `mongo_id`.
|
|
offset, length: slice the body (default first 8000 chars). length capped at 200000.
|
|
around_match: if given, return up to 3 windows of ~1000 chars centered on the first matches
|
|
of this substring (case-insensitive). Useful to jump to a keyword in a long doc.
|
|
|
|
Body is truncated to fit; check `body_length` vs returned length to know if more exists.
|
|
Use offset to page further (offset=8000, then 16000, ...).
|
|
"""
|
|
try:
|
|
if not path and not mongo_id:
|
|
return {"error": "Provide either path or mongo_id."}
|
|
|
|
length = min(max(1, length), MAX_BODY_CHARS)
|
|
|
|
sql = """
|
|
SELECT id, mongo_id, study, path, rel_path, name, ext, sha256,
|
|
size_bytes, mtime, body, body_length, extractor_version,
|
|
extracted_at, ok, error
|
|
FROM documents
|
|
WHERE """ + ("path = %s" if path else "mongo_id = %s") + " LIMIT 1"
|
|
|
|
with pg_conn() as pg, pg.cursor() as cur:
|
|
cur.execute(sql, (path or mongo_id,))
|
|
row = cur.fetchone()
|
|
cols = [c.name for c in cur.description]
|
|
if not row:
|
|
return {"error": "Document not found.", "path": path, "mongo_id": mongo_id}
|
|
rec = dict(zip(cols, row))
|
|
|
|
body = rec.get("body") or ""
|
|
|
|
if around_match and body:
|
|
needle = around_match.lower()
|
|
hay = body.lower()
|
|
windows = []
|
|
start = 0
|
|
while len(windows) < 3:
|
|
pos = hay.find(needle, start)
|
|
if pos < 0:
|
|
break
|
|
lo = max(0, pos - 400)
|
|
hi = min(len(body), pos + 600)
|
|
windows.append({"offset": lo, "text": body[lo:hi]})
|
|
start = pos + len(needle)
|
|
body_out = None
|
|
slice_info = {"mode": "around_match", "match": around_match,
|
|
"windows": windows, "windows_found": len(windows)}
|
|
else:
|
|
body_out = body[offset:offset + length]
|
|
slice_info = {
|
|
"mode": "slice", "offset": offset,
|
|
"length_returned": len(body_out),
|
|
"has_more": offset + length < len(body),
|
|
"next_offset": offset + length if offset + length < len(body) else None,
|
|
}
|
|
|
|
# Mongo metadata
|
|
col_code = rec["study"]
|
|
mdoc = mongo[MONGO_DB][col_code].find_one(
|
|
{"path": rec["path"]},
|
|
{"content": 1, "dates_in_name": 1, "parent_folders": 1, "tokens": 1},
|
|
) or {}
|
|
|
|
out = {
|
|
"study": rec["study"],
|
|
"path": rec["path"],
|
|
"rel_path": rec["rel_path"],
|
|
"name": rec["name"],
|
|
"ext": rec["ext"],
|
|
"size_mb": round((rec["size_bytes"] or 0) / 1024 / 1024, 2),
|
|
"mtime": serialize(rec["mtime"]),
|
|
"sha256": rec["sha256"],
|
|
"body_length": rec["body_length"],
|
|
"extractor_version": rec["extractor_version"],
|
|
"extracted_at": serialize(rec["extracted_at"]),
|
|
"ok": rec["ok"],
|
|
"error": rec["error"],
|
|
"parent_folders": mdoc.get("parent_folders"),
|
|
"dates_in_name": mdoc.get("dates_in_name"),
|
|
"metadata": short_meta(mdoc.get("content") or {}),
|
|
}
|
|
if body_out is not None:
|
|
out["body"] = body_out
|
|
out["slice"] = slice_info
|
|
return out
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def get_metadata(path: str) -> dict:
|
|
"""Return raw Mongo document for one path (full content.*, parent_folders, dates_in_name,
|
|
sha256, sizes, timestamps, tokens). Use when you need the full structured metadata —
|
|
e.g. all sheet names of an XLSX, all attachments of an email, full author info.
|
|
Does NOT return body text — use `read_document` for that.
|
|
"""
|
|
try:
|
|
for code in STUDY_ALL:
|
|
d = mongo[MONGO_DB][code].find_one({"path": path})
|
|
if d:
|
|
return serialize(d)
|
|
return {"error": "Not found in any study collection.", "path": path}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def recent_files(
|
|
study: Optional[Union[str, list]] = None,
|
|
days: int = 7,
|
|
ext: Optional[Union[str, list]] = None,
|
|
limit: int = 30,
|
|
) -> dict:
|
|
"""List most recently modified files (no fulltext involved). Use for "what changed lately"
|
|
or "what did I get this week" questions.
|
|
|
|
days: window from now (default 7). Set to 0 for no time filter (just top-N newest).
|
|
"""
|
|
try:
|
|
studies = resolve_studies(study) or STUDY_ALL
|
|
exts = normalize_exts(ext)
|
|
limit = min(max(1, limit), 200)
|
|
|
|
q: dict = {"deleted_at": {"$exists": False}}
|
|
if exts:
|
|
q["ext"] = {"$in": exts}
|
|
if days and days > 0:
|
|
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
|
|
q["mtime"] = {"$gte": since_dt}
|
|
|
|
results = []
|
|
for code in studies:
|
|
for d in (mongo[MONGO_DB][code]
|
|
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
|
|
"size_bytes": 1, "mtime": 1, "study": 1,
|
|
"content.author": 1, "content.title": 1,
|
|
"content.last_modified_by": 1})
|
|
.sort("mtime", -1).limit(limit)):
|
|
results.append({
|
|
"study": d.get("study"),
|
|
"path": d["path"],
|
|
"rel_path": d.get("rel_path"),
|
|
"name": d.get("name"),
|
|
"ext": d.get("ext"),
|
|
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
|
|
"mtime": serialize(d.get("mtime")),
|
|
"author": (d.get("content") or {}).get("author"),
|
|
"title": (d.get("content") or {}).get("title"),
|
|
"last_modified_by": (d.get("content") or {}).get("last_modified_by"),
|
|
})
|
|
results.sort(key=lambda r: r["mtime"] or "", reverse=True)
|
|
return {"days": days, "count": len(results[:limit]), "results": results[:limit]}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def find_duplicates(
|
|
study: Optional[Union[str, list]] = None,
|
|
min_size_kb: int = 10,
|
|
limit: int = 30,
|
|
) -> dict:
|
|
"""Find groups of files with identical content (same sha256) but at different paths.
|
|
Reveals copies of the same document scattered across folders / studies.
|
|
|
|
min_size_kb: ignore tiny duplicate groups (default 10 KB)
|
|
limit: max duplicate groups returned
|
|
"""
|
|
try:
|
|
studies = resolve_studies(study) or STUDY_ALL
|
|
pipeline = [
|
|
{"$match": {"deleted_at": {"$exists": False},
|
|
"size_bytes": {"$gte": min_size_kb * 1024}}},
|
|
{"$group": {"_id": "$sha256",
|
|
"count": {"$sum": 1},
|
|
"size_bytes": {"$first": "$size_bytes"},
|
|
"ext": {"$first": "$ext"},
|
|
"paths": {"$push": {"study": "$study",
|
|
"path": "$path",
|
|
"rel_path": "$rel_path",
|
|
"mtime": "$mtime"}}}},
|
|
{"$match": {"count": {"$gte": 2}}},
|
|
{"$sort": {"size_bytes": -1, "count": -1}},
|
|
{"$limit": limit},
|
|
]
|
|
|
|
all_groups: dict = {}
|
|
for code in studies:
|
|
for g in mongo[MONGO_DB][code].aggregate(pipeline):
|
|
sha = g["_id"]
|
|
if sha in all_groups:
|
|
all_groups[sha]["count"] += g["count"]
|
|
all_groups[sha]["paths"].extend(g["paths"])
|
|
else:
|
|
all_groups[sha] = {
|
|
"sha256": sha, "count": g["count"], "ext": g["ext"],
|
|
"size_mb": round(g["size_bytes"] / 1024 / 1024, 2),
|
|
"paths": g["paths"],
|
|
}
|
|
|
|
groups = sorted(all_groups.values(),
|
|
key=lambda x: (x["size_mb"], x["count"]), reverse=True)[:limit]
|
|
for g in groups:
|
|
for p in g["paths"]:
|
|
p["mtime"] = serialize(p.get("mtime"))
|
|
return {
|
|
"filters": {"study": studies, "min_size_kb": min_size_kb},
|
|
"group_count": len(groups),
|
|
"wasted_mb_estimate": round(sum(g["size_mb"] * (g["count"] - 1) for g in groups), 2),
|
|
"groups": groups,
|
|
}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def by_author(
|
|
name: str,
|
|
study: Optional[Union[str, list]] = None,
|
|
ext: Optional[Union[str, list]] = None,
|
|
limit: int = 30,
|
|
) -> dict:
|
|
"""Find documents where content.author OR content.last_modified_by matches `name` (case-insensitive substring).
|
|
Works for DOCX/XLSX/PPTX/PDF embedded metadata. Use for "what did X write" or "who edited this".
|
|
"""
|
|
try:
|
|
studies = resolve_studies(study) or STUDY_ALL
|
|
exts = normalize_exts(ext)
|
|
limit = min(max(1, limit), 200)
|
|
|
|
rx = {"$regex": name, "$options": "i"}
|
|
q: dict = {"deleted_at": {"$exists": False},
|
|
"$or": [{"content.author": rx},
|
|
{"content.last_modified_by": rx}]}
|
|
if exts:
|
|
q["ext"] = {"$in": exts}
|
|
|
|
results = []
|
|
for code in studies:
|
|
for d in (mongo[MONGO_DB][code]
|
|
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
|
|
"size_bytes": 1, "mtime": 1, "study": 1, "content": 1})
|
|
.sort("mtime", -1).limit(limit)):
|
|
c = d.get("content") or {}
|
|
results.append({
|
|
"study": d.get("study"),
|
|
"path": d["path"],
|
|
"rel_path": d.get("rel_path"),
|
|
"name": d.get("name"),
|
|
"ext": d.get("ext"),
|
|
"mtime": serialize(d.get("mtime")),
|
|
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
|
|
"author": c.get("author"),
|
|
"last_modified_by": c.get("last_modified_by"),
|
|
"title": c.get("title"),
|
|
})
|
|
results.sort(key=lambda r: r["mtime"] or "", reverse=True)
|
|
return {"author_match": name, "count": len(results[:limit]), "results": results[:limit]}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
def browse_folder(
|
|
folder: str,
|
|
study: Optional[Union[str, list]] = None,
|
|
ext: Optional[Union[str, list]] = None,
|
|
limit: int = 100,
|
|
) -> dict:
|
|
"""List files where any parent folder name contains `folder` (case-insensitive substring match).
|
|
Use for "show me what's in the CRF folder" or "what's in Training". Returns just metadata,
|
|
no body text. Files sorted by relative path.
|
|
"""
|
|
try:
|
|
studies = resolve_studies(study) or STUDY_ALL
|
|
exts = normalize_exts(ext)
|
|
limit = min(max(1, limit), 500)
|
|
|
|
rx = {"$regex": folder, "$options": "i"}
|
|
q: dict = {"deleted_at": {"$exists": False}, "parent_folders": rx}
|
|
if exts:
|
|
q["ext"] = {"$in": exts}
|
|
|
|
results = []
|
|
for code in studies:
|
|
for d in (mongo[MONGO_DB][code]
|
|
.find(q, {"path": 1, "rel_path": 1, "name": 1, "ext": 1,
|
|
"size_bytes": 1, "mtime": 1, "study": 1,
|
|
"parent_folders": 1, "dates_in_name": 1})
|
|
.sort("rel_path", 1).limit(limit)):
|
|
results.append({
|
|
"study": d.get("study"),
|
|
"path": d["path"],
|
|
"rel_path": d.get("rel_path"),
|
|
"name": d.get("name"),
|
|
"ext": d.get("ext"),
|
|
"mtime": serialize(d.get("mtime")),
|
|
"size_mb": round((d.get("size_bytes") or 0) / 1024 / 1024, 2),
|
|
"parent_folders": d.get("parent_folders"),
|
|
"dates_in_name": d.get("dates_in_name"),
|
|
})
|
|
return {"folder_match": folder, "count": len(results), "results": results}
|
|
except Exception as e:
|
|
log(traceback.format_exc())
|
|
return {"error": str(e)}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log("MCP soubory server started (FastMCP)")
|
|
mcp.run()
|