#!/usr/bin/env python3 """ MCP server pro MongoDB — používá FastMCP. Spustit: python mcp_mongo.py """ import json import sys import traceback from datetime import datetime, date from typing import Optional, Union from bson import ObjectId from mcp.server.fastmcp import FastMCP from pymongo import MongoClient MONGO_HOST = "192.168.1.76" def log(msg: str): print(msg, file=sys.stderr, flush=True) try: client = MongoClient(MONGO_HOST, serverSelectionTimeoutMS=5000) client.server_info() log(f"Connected to MongoDB ({MONGO_HOST})") except Exception as e: log(f"MongoDB connection failed: {e}") sys.exit(1) def serialize(obj): """Make MongoDB documents JSON-serializable.""" if isinstance(obj, ObjectId): return str(obj) if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, date): return obj.isoformat() if isinstance(obj, bytes): return obj.decode("utf-8", errors="replace") if isinstance(obj, dict): return {k: serialize(v) for k, v in obj.items()} if isinstance(obj, list): return [serialize(v) for v in obj] return obj def parse_filter(filter_json) -> dict: """Parse JSON string or dict to dict. Returns {} on None/empty. Automatically converts string _id values to ObjectId.""" if not filter_json: return {} if isinstance(filter_json, str): if not filter_json.strip(): return {} filter_json = json.loads(filter_json) if isinstance(filter_json, dict): if "_id" in filter_json and isinstance(filter_json["_id"], str): try: filter_json = dict(filter_json) filter_json["_id"] = ObjectId(filter_json["_id"]) except Exception: pass return filter_json return {} mcp = FastMCP("janssen-mongo") @mcp.tool() def list_databases() -> dict: """List all databases on the MongoDB server (excludes admin/config/local).""" try: skip = {"admin", "config", "local"} dbs = [d["name"] for d in client.list_databases() if d["name"] not in skip] return {"count": len(dbs), "databases": dbs} except Exception: log(f"list_databases error: {traceback.format_exc()}") raise @mcp.tool() def list_collections(db: str) -> dict: """List all collections in a database.""" try: cols = sorted(client[db].list_collection_names()) return {"db": db, "count": len(cols), "collections": cols} except Exception: log(f"list_collections error: {traceback.format_exc()}") raise @mcp.tool() def collection_stats(db: str, collection: str) -> dict: """Returns document count + schema sample (union of fields and types from first 20 documents).""" try: col = client[db][collection] count = col.count_documents({}) sample_docs = list(col.find().limit(20)) schema = {} for doc in sample_docs: for k, v in doc.items(): if k not in schema: schema[k] = type(v).__name__ return {"db": db, "collection": collection, "count": count, "schema": schema} except Exception: log(f"collection_stats error: {traceback.format_exc()}") raise @mcp.tool() def find_documents( db: str, collection: str, filter_json: Optional[Union[str, dict]] = None, projection_json: Optional[Union[str, dict]] = None, sort_json: Optional[Union[str, dict]] = None, limit: int = 50, skip: int = 0, ) -> dict: """Query documents. filter/projection/sort are JSON strings or dicts, e.g. '{"patient_code":"CZ1-01"}'. sort example: '{"last_seen_at": -1}'. Limit max 500. Use skip for pagination. """ try: col = client[db][collection] filt = parse_filter(filter_json) proj = parse_filter(projection_json) or None sort_spec = parse_filter(sort_json) limit = min(limit, 500) cursor = col.find(filt, proj) if sort_spec: cursor = cursor.sort(list(sort_spec.items())) cursor = cursor.skip(skip).limit(limit) docs = [serialize(doc) for doc in cursor] total = col.count_documents(filt) return {"total": total, "skip": skip, "count": len(docs), "docs": docs} except Exception: log(f"find_documents error: {traceback.format_exc()}") raise @mcp.tool() def aggregate(db: str, collection: str, pipeline_json: str) -> dict: """Run a MongoDB aggregation pipeline. pipeline_json is a JSON array of stages, e.g. '[{"$group": {"_id": "$patient_code", "count": {"$sum": 1}}}]'. """ try: col = client[db][collection] pipeline = json.loads(pipeline_json) results = [serialize(doc) for doc in col.aggregate(pipeline)] return {"count": len(results), "results": results} except Exception: log(f"aggregate error: {traceback.format_exc()}") raise @mcp.tool() def field_coverage(db: str, collection: str, field: str) -> dict: """Check how many documents have a given field present and non-null/non-empty. Useful for verifying data consistency across a collection. Returns total count, how many have the field, how many are missing it, and coverage %. """ try: col = client[db][collection] total = col.count_documents({}) present = col.count_documents({field: {"$exists": True, "$ne": None, "$ne": ""}}) missing = total - present return { "field": field, "total": total, "present": present, "missing": missing, "coverage_pct": round(present / total * 100, 1) if total else 0, } except Exception: log(f"field_coverage error: {traceback.format_exc()}") raise @mcp.tool() def count_documents(db: str, collection: str, filter_json: Optional[Union[str, dict]] = None) -> dict: """Count documents matching an optional filter. Fast way to check data without fetching content.""" try: col = client[db][collection] filt = parse_filter(filter_json) count = col.count_documents(filt) return {"db": db, "collection": collection, "filter": filt, "count": count} except Exception: log(f"count_documents error: {traceback.format_exc()}") raise @mcp.tool() def schema_diff( db: str, collection: str, filter_a_json: Optional[Union[str, dict]] = None, filter_b_json: Optional[Union[str, dict]] = None, sample_size: int = 50, ) -> dict: """Compare field presence between two groups of documents in a collection. filter_a_json and filter_b_json define the two groups (defaults to all documents). Returns fields that are only in group A, only in group B, and in both — useful for diagnosing why some documents are missing fields that others have. """ try: col = client[db][collection] filt_a = parse_filter(filter_a_json) filt_b = parse_filter(filter_b_json) def get_field_set(filt: dict) -> set: fields = set() for doc in col.find(filt).limit(sample_size): fields.update(doc.keys()) return fields fields_a = get_field_set(filt_a) fields_b = get_field_set(filt_b) only_in_a = sorted(fields_a - fields_b) only_in_b = sorted(fields_b - fields_a) in_both = sorted(fields_a & fields_b) return { "only_in_a": only_in_a, "only_in_b": only_in_b, "in_both": in_both, "count_a": len(fields_a), "count_b": len(fields_b), } except Exception: log(f"schema_diff error: {traceback.format_exc()}") raise @mcp.tool() def ping() -> dict: """Check if MongoDB is reachable. Returns server version and latency.""" try: import time start = time.monotonic() info = client.server_info() latency_ms = round((time.monotonic() - start) * 1000, 1) return { "status": "ok", "version": info.get("version", "unknown"), "latency_ms": latency_ms, "host": MONGO_HOST, } except Exception as e: return {"status": "error", "error": str(e), "host": MONGO_HOST} @mcp.tool() def distinct_values(db: str, collection: str, field: str, filter_json: Optional[Union[str, dict]] = None) -> dict: """Return distinct values of a field, optionally filtered. Useful for exploring enums and categories.""" try: col = client[db][collection] filt = parse_filter(filter_json) values = [serialize(v) for v in col.distinct(field, filt)] return {"field": field, "count": len(values), "values": values} except Exception: log(f"distinct_values error: {traceback.format_exc()}") raise @mcp.tool() def preview_update( db: str, collection: str, filter_json: Union[str, dict], update_json: Union[str, dict], ) -> dict: """Preview what update_documents will change — shows affected count and sample of matching docs BEFORE any write. Always call this first, then present the summary to the user for confirmation. filter_json: MongoDB filter, e.g. '{}' for all documents. update_json: MongoDB update operator, e.g. '{"$set": {"Zdroj": "Iluminátor"}}'. """ try: col = client[db][collection] filt = parse_filter(filter_json) affected = col.count_documents(filt) sample = [serialize(doc) for doc in col.find(filt).limit(3)] upd = parse_filter(update_json) return { "db": db, "collection": collection, "filter": filt, "update": upd, "affected_count": affected, "sample_docs": sample, "note": "No changes made yet. Present this summary to the user and ask for confirmation before calling update_documents.", } except Exception: log(f"preview_update error: {traceback.format_exc()}") raise @mcp.tool() def update_documents( db: str, collection: str, filter_json: Union[str, dict], update_json: Union[str, dict], confirmed: bool = False, ) -> dict: """Update documents matching filter_json using update_json (MongoDB update operators). REQUIRES confirmed=True — only set this after presenting preview_update output to the user and receiving explicit approval. filter_json example: '{"zeme": "Czech Republic"}' update_json example: '{"$set": {"Zdroj": "Iluminátor"}}' """ if not confirmed: return { "status": "aborted", "reason": "confirmed=False. Call preview_update first, show the user what will change, and only proceed with confirmed=True after explicit approval.", } try: col = client[db][collection] filt = parse_filter(filter_json) upd = parse_filter(update_json) result = col.update_many(filt, upd) log(f"update_documents: matched={result.matched_count} modified={result.modified_count}") return { "status": "ok", "db": db, "collection": collection, "filter": filt, "update": upd, "matched_count": result.matched_count, "modified_count": result.modified_count, } except Exception: log(f"update_documents error: {traceback.format_exc()}") raise @mcp.tool() def preview_insert( db: str, collection: str, documents_json: Union[str, list], ) -> dict: """Preview what insert_documents will insert — shows count and the documents themselves BEFORE any write. Always call this first, then present the summary to the user for confirmation. documents_json: JSON array of documents, e.g. '[{"name": "Test", "value": 1}]'. """ try: docs = json.loads(documents_json) if isinstance(documents_json, str) else documents_json return { "db": db, "collection": collection, "insert_count": len(docs), "documents": docs, "note": "No changes made yet. Present this summary to the user and ask for confirmation before calling insert_documents.", } except Exception: log(f"preview_insert error: {traceback.format_exc()}") raise @mcp.tool() def insert_documents( db: str, collection: str, documents_json: Union[str, list], confirmed: bool = False, ) -> dict: """Insert one or more documents into a collection. REQUIRES confirmed=True — only set this after presenting preview_insert output to the user and receiving explicit approval. documents_json: JSON array of documents. """ if not confirmed: return { "status": "aborted", "reason": "confirmed=False. Call preview_insert first, show the user what will be inserted, and only proceed with confirmed=True after explicit approval.", } try: col = client[db][collection] docs = json.loads(documents_json) if isinstance(documents_json, str) else documents_json result = col.insert_many(docs) log(f"insert_documents: inserted={len(result.inserted_ids)}") return { "status": "ok", "db": db, "collection": collection, "inserted_count": len(result.inserted_ids), "inserted_ids": [str(i) for i in result.inserted_ids], } except Exception: log(f"insert_documents error: {traceback.format_exc()}") raise @mcp.tool() def preview_delete( db: str, collection: str, filter_json: Union[str, dict], ) -> dict: """Preview what delete_documents will remove — shows affected count and sample docs BEFORE any write. Always call this first, then present the summary to the user for confirmation. filter_json: MongoDB filter. Be careful with '{}' — it matches all documents. """ try: col = client[db][collection] filt = parse_filter(filter_json) affected = col.count_documents(filt) sample = [serialize(doc) for doc in col.find(filt).limit(3)] return { "db": db, "collection": collection, "filter": filt, "affected_count": affected, "sample_docs": sample, "note": "No changes made yet. Present this summary to the user and ask for confirmation before calling delete_documents.", } except Exception: log(f"preview_delete error: {traceback.format_exc()}") raise @mcp.tool() def delete_documents( db: str, collection: str, filter_json: Union[str, dict], confirmed: bool = False, ) -> dict: """Delete documents matching filter_json. REQUIRES confirmed=True — only set this after presenting preview_delete output to the user and receiving explicit approval. WARNING: '{}' as filter will delete ALL documents in the collection. """ if not confirmed: return { "status": "aborted", "reason": "confirmed=False. Call preview_delete first, show the user what will be deleted, and only proceed with confirmed=True after explicit approval.", } try: col = client[db][collection] filt = parse_filter(filter_json) result = col.delete_many(filt) log(f"delete_documents: deleted={result.deleted_count}") return { "status": "ok", "db": db, "collection": collection, "filter": filt, "deleted_count": result.deleted_count, } except Exception: log(f"delete_documents error: {traceback.format_exc()}") raise @mcp.tool() def bulk_update( db: str, collection: str, operations_json: Union[str, list], confirmed: bool = False, ) -> dict: """Run multiple update operations in one call — each with its own filter and update. operations_json: JSON array of {filter, update} objects, e.g.: [{"filter": {"email": "a@b.com"}, "update": {"$set": {"excel": {...}}}}, ...] Each operation runs as update_one. REQUIRES confirmed=True. """ if not confirmed: ops = json.loads(operations_json) if isinstance(operations_json, str) else operations_json return { "status": "preview", "operation_count": len(ops), "sample": ops[:3], "note": "No changes made. Call again with confirmed=True to execute.", } try: from pymongo import UpdateOne col = client[db][collection] ops = json.loads(operations_json) if isinstance(operations_json, str) else operations_json requests = [ UpdateOne(parse_filter(op["filter"]), parse_filter(op["update"])) for op in ops ] result = col.bulk_write(requests, ordered=False) log(f"bulk_update: matched={result.matched_count} modified={result.modified_count} upserted={result.upserted_count}") return { "status": "ok", "db": db, "collection": collection, "operation_count": len(requests), "matched_count": result.matched_count, "modified_count": result.modified_count, "upserted_count": result.upserted_count, } except Exception: log(f"bulk_update error: {traceback.format_exc()}") raise if __name__ == "__main__": log("MCP MongoDB server started (FastMCP)") mcp.run()