This commit is contained in:
2026-05-27 15:27:38 +02:00
parent 23aa4b7fe7
commit 93de82256a
50 changed files with 17314 additions and 18 deletions
+161
View File
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""
MCP server pro MongoDB — používá FastMCP.
Spustit: python mcp_mongo.py
"""
import json
import sys
import traceback
from datetime import datetime, date
from typing import Optional
from bson import ObjectId
from mcp.server.fastmcp import FastMCP
from pymongo import MongoClient
MONGO_HOST = "192.168.1.76"
def log(msg: str):
print(msg, file=sys.stderr, flush=True)
try:
client = MongoClient(MONGO_HOST, serverSelectionTimeoutMS=5000)
client.server_info()
log(f"Connected to MongoDB ({MONGO_HOST})")
except Exception as e:
log(f"MongoDB connection failed: {e}")
sys.exit(1)
def serialize(obj):
"""Make MongoDB documents JSON-serializable."""
if isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, date):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
if isinstance(obj, dict):
return {k: serialize(v) for k, v in obj.items()}
if isinstance(obj, list):
return [serialize(v) for v in obj]
return obj
def parse_filter(filter_json: Optional[str]) -> dict:
"""Parse JSON string to dict. Returns {} on None/empty."""
if not filter_json or not filter_json.strip():
return {}
return json.loads(filter_json)
mcp = FastMCP("janssen-mongo")
@mcp.tool()
def list_databases() -> dict:
"""List all databases on the MongoDB server (excludes admin/config/local)."""
try:
skip = {"admin", "config", "local"}
dbs = [d["name"] for d in client.list_databases() if d["name"] not in skip]
return {"count": len(dbs), "databases": dbs}
except Exception:
log(f"list_databases error: {traceback.format_exc()}")
raise
@mcp.tool()
def list_collections(db: str) -> dict:
"""List all collections in a database."""
try:
cols = sorted(client[db].list_collection_names())
return {"db": db, "count": len(cols), "collections": cols}
except Exception:
log(f"list_collections error: {traceback.format_exc()}")
raise
@mcp.tool()
def collection_stats(db: str, collection: str) -> dict:
"""Returns document count + schema sample (fields and types from first document)."""
try:
col = client[db][collection]
count = col.estimated_document_count()
sample = col.find_one()
schema = {}
if sample:
for k, v in sample.items():
schema[k] = type(v).__name__
return {"db": db, "collection": collection, "count": count, "schema": schema}
except Exception:
log(f"collection_stats error: {traceback.format_exc()}")
raise
@mcp.tool()
def find_documents(
db: str,
collection: str,
filter_json: Optional[str] = None,
projection_json: Optional[str] = None,
sort_json: Optional[str] = None,
limit: int = 50,
) -> dict:
"""Query documents. filter/projection/sort are JSON strings, e.g. '{"patient_code":"CZ1-01"}'.
sort example: '{"last_seen_at": -1}'. Limit max 500.
"""
try:
col = client[db][collection]
filt = parse_filter(filter_json)
proj = parse_filter(projection_json) or None
sort_spec = parse_filter(sort_json)
limit = min(limit, 500)
cursor = col.find(filt, proj)
if sort_spec:
cursor = cursor.sort(list(sort_spec.items()))
cursor = cursor.limit(limit)
docs = [serialize(doc) for doc in cursor]
return {"count": len(docs), "docs": docs}
except Exception:
log(f"find_documents error: {traceback.format_exc()}")
raise
@mcp.tool()
def aggregate(db: str, collection: str, pipeline_json: str) -> dict:
"""Run a MongoDB aggregation pipeline. pipeline_json is a JSON array of stages,
e.g. '[{"$group": {"_id": "$patient_code", "count": {"$sum": 1}}}]'.
"""
try:
col = client[db][collection]
pipeline = json.loads(pipeline_json)
results = [serialize(doc) for doc in col.aggregate(pipeline)]
return {"count": len(results), "results": results}
except Exception:
log(f"aggregate error: {traceback.format_exc()}")
raise
@mcp.tool()
def distinct_values(db: str, collection: str, field: str, filter_json: Optional[str] = None) -> dict:
"""Return distinct values of a field, optionally filtered."""
try:
col = client[db][collection]
filt = parse_filter(filter_json)
values = [serialize(v) for v in col.distinct(field, filt)]
return {"field": field, "count": len(values), "values": values}
except Exception:
log(f"distinct_values error: {traceback.format_exc()}")
raise
if __name__ == "__main__":
log("MCP MongoDB server started (FastMCP)")
mcp.run()