510 lines
17 KiB
Python
510 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP server pro MongoDB — používá FastMCP.
|
|
Spustit: python mcp_mongo.py
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime, date
|
|
from typing import Optional, Union
|
|
|
|
from bson import ObjectId
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pymongo import MongoClient
|
|
|
|
MONGO_HOST = "192.168.1.76"
|
|
|
|
|
|
def log(msg: str):
|
|
print(msg, file=sys.stderr, flush=True)
|
|
|
|
|
|
try:
|
|
client = MongoClient(MONGO_HOST, serverSelectionTimeoutMS=5000)
|
|
client.server_info()
|
|
log(f"Connected to MongoDB ({MONGO_HOST})")
|
|
except Exception as e:
|
|
log(f"MongoDB connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def serialize(obj):
|
|
"""Make MongoDB documents JSON-serializable."""
|
|
if isinstance(obj, ObjectId):
|
|
return str(obj)
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
if isinstance(obj, date):
|
|
return obj.isoformat()
|
|
if isinstance(obj, bytes):
|
|
return obj.decode("utf-8", errors="replace")
|
|
if isinstance(obj, dict):
|
|
return {k: serialize(v) for k, v in obj.items()}
|
|
if isinstance(obj, list):
|
|
return [serialize(v) for v in obj]
|
|
return obj
|
|
|
|
|
|
def parse_filter(filter_json) -> dict:
|
|
"""Parse JSON string or dict to dict. Returns {} on None/empty.
|
|
Automatically converts string _id values to ObjectId."""
|
|
if not filter_json:
|
|
return {}
|
|
if isinstance(filter_json, str):
|
|
if not filter_json.strip():
|
|
return {}
|
|
filter_json = json.loads(filter_json)
|
|
if isinstance(filter_json, dict):
|
|
if "_id" in filter_json and isinstance(filter_json["_id"], str):
|
|
try:
|
|
filter_json = dict(filter_json)
|
|
filter_json["_id"] = ObjectId(filter_json["_id"])
|
|
except Exception:
|
|
pass
|
|
return filter_json
|
|
return {}
|
|
|
|
|
|
mcp = FastMCP("janssen-mongo")
|
|
|
|
|
|
@mcp.tool()
|
|
def list_databases() -> dict:
|
|
"""List all databases on the MongoDB server (excludes admin/config/local)."""
|
|
try:
|
|
skip = {"admin", "config", "local"}
|
|
dbs = [d["name"] for d in client.list_databases() if d["name"] not in skip]
|
|
return {"count": len(dbs), "databases": dbs}
|
|
except Exception:
|
|
log(f"list_databases error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def list_collections(db: str) -> dict:
|
|
"""List all collections in a database."""
|
|
try:
|
|
cols = sorted(client[db].list_collection_names())
|
|
return {"db": db, "count": len(cols), "collections": cols}
|
|
except Exception:
|
|
log(f"list_collections error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def collection_stats(db: str, collection: str) -> dict:
|
|
"""Returns document count + schema sample (union of fields and types from first 20 documents)."""
|
|
try:
|
|
col = client[db][collection]
|
|
count = col.count_documents({})
|
|
sample_docs = list(col.find().limit(20))
|
|
schema = {}
|
|
for doc in sample_docs:
|
|
for k, v in doc.items():
|
|
if k not in schema:
|
|
schema[k] = type(v).__name__
|
|
return {"db": db, "collection": collection, "count": count, "schema": schema}
|
|
except Exception:
|
|
log(f"collection_stats error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def find_documents(
|
|
db: str,
|
|
collection: str,
|
|
filter_json: Optional[Union[str, dict]] = None,
|
|
projection_json: Optional[Union[str, dict]] = None,
|
|
sort_json: Optional[Union[str, dict]] = None,
|
|
limit: int = 50,
|
|
skip: int = 0,
|
|
) -> dict:
|
|
"""Query documents. filter/projection/sort are JSON strings or dicts, e.g. '{"patient_code":"CZ1-01"}'.
|
|
sort example: '{"last_seen_at": -1}'. Limit max 500. Use skip for pagination.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
proj = parse_filter(projection_json) or None
|
|
sort_spec = parse_filter(sort_json)
|
|
limit = min(limit, 500)
|
|
|
|
cursor = col.find(filt, proj)
|
|
if sort_spec:
|
|
cursor = cursor.sort(list(sort_spec.items()))
|
|
cursor = cursor.skip(skip).limit(limit)
|
|
|
|
docs = [serialize(doc) for doc in cursor]
|
|
total = col.count_documents(filt)
|
|
return {"total": total, "skip": skip, "count": len(docs), "docs": docs}
|
|
except Exception:
|
|
log(f"find_documents error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def aggregate(db: str, collection: str, pipeline_json: str) -> dict:
|
|
"""Run a MongoDB aggregation pipeline. pipeline_json is a JSON array of stages,
|
|
e.g. '[{"$group": {"_id": "$patient_code", "count": {"$sum": 1}}}]'.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
pipeline = json.loads(pipeline_json)
|
|
results = [serialize(doc) for doc in col.aggregate(pipeline)]
|
|
return {"count": len(results), "results": results}
|
|
except Exception:
|
|
log(f"aggregate error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def field_coverage(db: str, collection: str, field: str) -> dict:
|
|
"""Check how many documents have a given field present and non-null/non-empty.
|
|
Useful for verifying data consistency across a collection.
|
|
Returns total count, how many have the field, how many are missing it, and coverage %.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
total = col.count_documents({})
|
|
present = col.count_documents({field: {"$exists": True, "$ne": None, "$ne": ""}})
|
|
missing = total - present
|
|
return {
|
|
"field": field,
|
|
"total": total,
|
|
"present": present,
|
|
"missing": missing,
|
|
"coverage_pct": round(present / total * 100, 1) if total else 0,
|
|
}
|
|
except Exception:
|
|
log(f"field_coverage error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def count_documents(db: str, collection: str, filter_json: Optional[Union[str, dict]] = None) -> dict:
|
|
"""Count documents matching an optional filter. Fast way to check data without fetching content."""
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
count = col.count_documents(filt)
|
|
return {"db": db, "collection": collection, "filter": filt, "count": count}
|
|
except Exception:
|
|
log(f"count_documents error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def schema_diff(
|
|
db: str,
|
|
collection: str,
|
|
filter_a_json: Optional[Union[str, dict]] = None,
|
|
filter_b_json: Optional[Union[str, dict]] = None,
|
|
sample_size: int = 50,
|
|
) -> dict:
|
|
"""Compare field presence between two groups of documents in a collection.
|
|
filter_a_json and filter_b_json define the two groups (defaults to all documents).
|
|
Returns fields that are only in group A, only in group B, and in both — useful for
|
|
diagnosing why some documents are missing fields that others have.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
filt_a = parse_filter(filter_a_json)
|
|
filt_b = parse_filter(filter_b_json)
|
|
|
|
def get_field_set(filt: dict) -> set:
|
|
fields = set()
|
|
for doc in col.find(filt).limit(sample_size):
|
|
fields.update(doc.keys())
|
|
return fields
|
|
|
|
fields_a = get_field_set(filt_a)
|
|
fields_b = get_field_set(filt_b)
|
|
|
|
only_in_a = sorted(fields_a - fields_b)
|
|
only_in_b = sorted(fields_b - fields_a)
|
|
in_both = sorted(fields_a & fields_b)
|
|
|
|
return {
|
|
"only_in_a": only_in_a,
|
|
"only_in_b": only_in_b,
|
|
"in_both": in_both,
|
|
"count_a": len(fields_a),
|
|
"count_b": len(fields_b),
|
|
}
|
|
except Exception:
|
|
log(f"schema_diff error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def ping() -> dict:
|
|
"""Check if MongoDB is reachable. Returns server version and latency."""
|
|
try:
|
|
import time
|
|
start = time.monotonic()
|
|
info = client.server_info()
|
|
latency_ms = round((time.monotonic() - start) * 1000, 1)
|
|
return {
|
|
"status": "ok",
|
|
"version": info.get("version", "unknown"),
|
|
"latency_ms": latency_ms,
|
|
"host": MONGO_HOST,
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e), "host": MONGO_HOST}
|
|
|
|
|
|
@mcp.tool()
|
|
def distinct_values(db: str, collection: str, field: str, filter_json: Optional[Union[str, dict]] = None) -> dict:
|
|
"""Return distinct values of a field, optionally filtered. Useful for exploring enums and categories."""
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
values = [serialize(v) for v in col.distinct(field, filt)]
|
|
return {"field": field, "count": len(values), "values": values}
|
|
except Exception:
|
|
log(f"distinct_values error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def preview_update(
|
|
db: str,
|
|
collection: str,
|
|
filter_json: Union[str, dict],
|
|
update_json: Union[str, dict],
|
|
) -> dict:
|
|
"""Preview what update_documents will change — shows affected count and sample of matching docs
|
|
BEFORE any write. Always call this first, then present the summary to the user for confirmation.
|
|
filter_json: MongoDB filter, e.g. '{}' for all documents.
|
|
update_json: MongoDB update operator, e.g. '{"$set": {"Zdroj": "Iluminátor"}}'.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
affected = col.count_documents(filt)
|
|
sample = [serialize(doc) for doc in col.find(filt).limit(3)]
|
|
upd = parse_filter(update_json)
|
|
return {
|
|
"db": db,
|
|
"collection": collection,
|
|
"filter": filt,
|
|
"update": upd,
|
|
"affected_count": affected,
|
|
"sample_docs": sample,
|
|
"note": "No changes made yet. Present this summary to the user and ask for confirmation before calling update_documents.",
|
|
}
|
|
except Exception:
|
|
log(f"preview_update error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def update_documents(
|
|
db: str,
|
|
collection: str,
|
|
filter_json: Union[str, dict],
|
|
update_json: Union[str, dict],
|
|
confirmed: bool = False,
|
|
) -> dict:
|
|
"""Update documents matching filter_json using update_json (MongoDB update operators).
|
|
REQUIRES confirmed=True — only set this after presenting preview_update output to the user
|
|
and receiving explicit approval.
|
|
filter_json example: '{"zeme": "Czech Republic"}'
|
|
update_json example: '{"$set": {"Zdroj": "Iluminátor"}}'
|
|
"""
|
|
if not confirmed:
|
|
return {
|
|
"status": "aborted",
|
|
"reason": "confirmed=False. Call preview_update first, show the user what will change, and only proceed with confirmed=True after explicit approval.",
|
|
}
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
upd = parse_filter(update_json)
|
|
result = col.update_many(filt, upd)
|
|
log(f"update_documents: matched={result.matched_count} modified={result.modified_count}")
|
|
return {
|
|
"status": "ok",
|
|
"db": db,
|
|
"collection": collection,
|
|
"filter": filt,
|
|
"update": upd,
|
|
"matched_count": result.matched_count,
|
|
"modified_count": result.modified_count,
|
|
}
|
|
except Exception:
|
|
log(f"update_documents error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def preview_insert(
|
|
db: str,
|
|
collection: str,
|
|
documents_json: Union[str, list],
|
|
) -> dict:
|
|
"""Preview what insert_documents will insert — shows count and the documents themselves
|
|
BEFORE any write. Always call this first, then present the summary to the user for confirmation.
|
|
documents_json: JSON array of documents, e.g. '[{"name": "Test", "value": 1}]'.
|
|
"""
|
|
try:
|
|
docs = json.loads(documents_json) if isinstance(documents_json, str) else documents_json
|
|
return {
|
|
"db": db,
|
|
"collection": collection,
|
|
"insert_count": len(docs),
|
|
"documents": docs,
|
|
"note": "No changes made yet. Present this summary to the user and ask for confirmation before calling insert_documents.",
|
|
}
|
|
except Exception:
|
|
log(f"preview_insert error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def insert_documents(
|
|
db: str,
|
|
collection: str,
|
|
documents_json: Union[str, list],
|
|
confirmed: bool = False,
|
|
) -> dict:
|
|
"""Insert one or more documents into a collection.
|
|
REQUIRES confirmed=True — only set this after presenting preview_insert output to the user
|
|
and receiving explicit approval.
|
|
documents_json: JSON array of documents.
|
|
"""
|
|
if not confirmed:
|
|
return {
|
|
"status": "aborted",
|
|
"reason": "confirmed=False. Call preview_insert first, show the user what will be inserted, and only proceed with confirmed=True after explicit approval.",
|
|
}
|
|
try:
|
|
col = client[db][collection]
|
|
docs = json.loads(documents_json) if isinstance(documents_json, str) else documents_json
|
|
result = col.insert_many(docs)
|
|
log(f"insert_documents: inserted={len(result.inserted_ids)}")
|
|
return {
|
|
"status": "ok",
|
|
"db": db,
|
|
"collection": collection,
|
|
"inserted_count": len(result.inserted_ids),
|
|
"inserted_ids": [str(i) for i in result.inserted_ids],
|
|
}
|
|
except Exception:
|
|
log(f"insert_documents error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def preview_delete(
|
|
db: str,
|
|
collection: str,
|
|
filter_json: Union[str, dict],
|
|
) -> dict:
|
|
"""Preview what delete_documents will remove — shows affected count and sample docs
|
|
BEFORE any write. Always call this first, then present the summary to the user for confirmation.
|
|
filter_json: MongoDB filter. Be careful with '{}' — it matches all documents.
|
|
"""
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
affected = col.count_documents(filt)
|
|
sample = [serialize(doc) for doc in col.find(filt).limit(3)]
|
|
return {
|
|
"db": db,
|
|
"collection": collection,
|
|
"filter": filt,
|
|
"affected_count": affected,
|
|
"sample_docs": sample,
|
|
"note": "No changes made yet. Present this summary to the user and ask for confirmation before calling delete_documents.",
|
|
}
|
|
except Exception:
|
|
log(f"preview_delete error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def delete_documents(
|
|
db: str,
|
|
collection: str,
|
|
filter_json: Union[str, dict],
|
|
confirmed: bool = False,
|
|
) -> dict:
|
|
"""Delete documents matching filter_json.
|
|
REQUIRES confirmed=True — only set this after presenting preview_delete output to the user
|
|
and receiving explicit approval.
|
|
WARNING: '{}' as filter will delete ALL documents in the collection.
|
|
"""
|
|
if not confirmed:
|
|
return {
|
|
"status": "aborted",
|
|
"reason": "confirmed=False. Call preview_delete first, show the user what will be deleted, and only proceed with confirmed=True after explicit approval.",
|
|
}
|
|
try:
|
|
col = client[db][collection]
|
|
filt = parse_filter(filter_json)
|
|
result = col.delete_many(filt)
|
|
log(f"delete_documents: deleted={result.deleted_count}")
|
|
return {
|
|
"status": "ok",
|
|
"db": db,
|
|
"collection": collection,
|
|
"filter": filt,
|
|
"deleted_count": result.deleted_count,
|
|
}
|
|
except Exception:
|
|
log(f"delete_documents error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
@mcp.tool()
|
|
def bulk_update(
|
|
db: str,
|
|
collection: str,
|
|
operations_json: Union[str, list],
|
|
confirmed: bool = False,
|
|
) -> dict:
|
|
"""Run multiple update operations in one call — each with its own filter and update.
|
|
operations_json: JSON array of {filter, update} objects, e.g.:
|
|
[{"filter": {"email": "a@b.com"}, "update": {"$set": {"excel": {...}}}}, ...]
|
|
Each operation runs as update_one. REQUIRES confirmed=True.
|
|
"""
|
|
if not confirmed:
|
|
ops = json.loads(operations_json) if isinstance(operations_json, str) else operations_json
|
|
return {
|
|
"status": "preview",
|
|
"operation_count": len(ops),
|
|
"sample": ops[:3],
|
|
"note": "No changes made. Call again with confirmed=True to execute.",
|
|
}
|
|
try:
|
|
from pymongo import UpdateOne
|
|
col = client[db][collection]
|
|
ops = json.loads(operations_json) if isinstance(operations_json, str) else operations_json
|
|
requests = [
|
|
UpdateOne(parse_filter(op["filter"]), parse_filter(op["update"]))
|
|
for op in ops
|
|
]
|
|
result = col.bulk_write(requests, ordered=False)
|
|
log(f"bulk_update: matched={result.matched_count} modified={result.modified_count} upserted={result.upserted_count}")
|
|
return {
|
|
"status": "ok",
|
|
"db": db,
|
|
"collection": collection,
|
|
"operation_count": len(requests),
|
|
"matched_count": result.matched_count,
|
|
"modified_count": result.modified_count,
|
|
"upserted_count": result.upserted_count,
|
|
}
|
|
except Exception:
|
|
log(f"bulk_update error: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log("MCP MongoDB server started (FastMCP)")
|
|
mcp.run()
|