This commit is contained in:
2026-05-28 11:22:10 +02:00
parent 10beab5c84
commit edb17f04cb
232 changed files with 6715 additions and 2411 deletions
+126 -20
View File
@@ -8,7 +8,7 @@ import json
import sys
import traceback
from datetime import datetime, date
from typing import Optional
from typing import Optional, Union
from bson import ObjectId
from mcp.server.fastmcp import FastMCP
@@ -47,11 +47,17 @@ def serialize(obj):
return obj
def parse_filter(filter_json: Optional[str]) -> dict:
"""Parse JSON string to dict. Returns {} on None/empty."""
if not filter_json or not filter_json.strip():
def parse_filter(filter_json) -> dict:
"""Parse JSON string or dict to dict. Returns {} on None/empty."""
if not filter_json:
return {}
return json.loads(filter_json)
if isinstance(filter_json, dict):
return filter_json
if isinstance(filter_json, str):
if not filter_json.strip():
return {}
return json.loads(filter_json)
return {}
mcp = FastMCP("janssen-mongo")
@@ -82,15 +88,16 @@ def list_collections(db: str) -> dict:
@mcp.tool()
def collection_stats(db: str, collection: str) -> dict:
"""Returns document count + schema sample (fields and types from first document)."""
"""Returns document count + schema sample (union of fields and types from first 20 documents)."""
try:
col = client[db][collection]
count = col.estimated_document_count()
sample = col.find_one()
count = col.count_documents({})
sample_docs = list(col.find().limit(20))
schema = {}
if sample:
for k, v in sample.items():
schema[k] = type(v).__name__
for doc in sample_docs:
for k, v in doc.items():
if k not in schema:
schema[k] = type(v).__name__
return {"db": db, "collection": collection, "count": count, "schema": schema}
except Exception:
log(f"collection_stats error: {traceback.format_exc()}")
@@ -101,13 +108,14 @@ def collection_stats(db: str, collection: str) -> dict:
def find_documents(
db: str,
collection: str,
filter_json: Optional[str] = None,
projection_json: Optional[str] = None,
sort_json: Optional[str] = None,
filter_json: Optional[Union[str, dict]] = None,
projection_json: Optional[Union[str, dict]] = None,
sort_json: Optional[Union[str, dict]] = None,
limit: int = 50,
skip: int = 0,
) -> dict:
"""Query documents. filter/projection/sort are JSON strings, e.g. '{"patient_code":"CZ1-01"}'.
sort example: '{"last_seen_at": -1}'. Limit max 500.
"""Query documents. filter/projection/sort are JSON strings or dicts, e.g. '{"patient_code":"CZ1-01"}'.
sort example: '{"last_seen_at": -1}'. Limit max 500. Use skip for pagination.
"""
try:
col = client[db][collection]
@@ -119,10 +127,11 @@ def find_documents(
cursor = col.find(filt, proj)
if sort_spec:
cursor = cursor.sort(list(sort_spec.items()))
cursor = cursor.limit(limit)
cursor = cursor.skip(skip).limit(limit)
docs = [serialize(doc) for doc in cursor]
return {"count": len(docs), "docs": docs}
total = col.count_documents(filt)
return {"total": total, "skip": skip, "count": len(docs), "docs": docs}
except Exception:
log(f"find_documents error: {traceback.format_exc()}")
raise
@@ -144,8 +153,105 @@ def aggregate(db: str, collection: str, pipeline_json: str) -> dict:
@mcp.tool()
def distinct_values(db: str, collection: str, field: str, filter_json: Optional[str] = None) -> dict:
"""Return distinct values of a field, optionally filtered."""
def field_coverage(db: str, collection: str, field: str) -> dict:
"""Check how many documents have a given field present and non-null/non-empty.
Useful for verifying data consistency across a collection.
Returns total count, how many have the field, how many are missing it, and coverage %.
"""
try:
col = client[db][collection]
total = col.count_documents({})
present = col.count_documents({field: {"$exists": True, "$ne": None, "$ne": ""}})
missing = total - present
return {
"field": field,
"total": total,
"present": present,
"missing": missing,
"coverage_pct": round(present / total * 100, 1) if total else 0,
}
except Exception:
log(f"field_coverage error: {traceback.format_exc()}")
raise
@mcp.tool()
def count_documents(db: str, collection: str, filter_json: Optional[Union[str, dict]] = None) -> dict:
"""Count documents matching an optional filter. Fast way to check data without fetching content."""
try:
col = client[db][collection]
filt = parse_filter(filter_json)
count = col.count_documents(filt)
return {"db": db, "collection": collection, "filter": filt, "count": count}
except Exception:
log(f"count_documents error: {traceback.format_exc()}")
raise
@mcp.tool()
def schema_diff(
db: str,
collection: str,
filter_a_json: Optional[Union[str, dict]] = None,
filter_b_json: Optional[Union[str, dict]] = None,
sample_size: int = 50,
) -> dict:
"""Compare field presence between two groups of documents in a collection.
filter_a_json and filter_b_json define the two groups (defaults to all documents).
Returns fields that are only in group A, only in group B, and in both — useful for
diagnosing why some documents are missing fields that others have.
"""
try:
col = client[db][collection]
filt_a = parse_filter(filter_a_json)
filt_b = parse_filter(filter_b_json)
def get_field_set(filt: dict) -> set:
fields = set()
for doc in col.find(filt).limit(sample_size):
fields.update(doc.keys())
return fields
fields_a = get_field_set(filt_a)
fields_b = get_field_set(filt_b)
only_in_a = sorted(fields_a - fields_b)
only_in_b = sorted(fields_b - fields_a)
in_both = sorted(fields_a & fields_b)
return {
"only_in_a": only_in_a,
"only_in_b": only_in_b,
"in_both": in_both,
"count_a": len(fields_a),
"count_b": len(fields_b),
}
except Exception:
log(f"schema_diff error: {traceback.format_exc()}")
raise
@mcp.tool()
def ping() -> dict:
"""Check if MongoDB is reachable. Returns server version and latency."""
try:
import time
start = time.monotonic()
info = client.server_info()
latency_ms = round((time.monotonic() - start) * 1000, 1)
return {
"status": "ok",
"version": info.get("version", "unknown"),
"latency_ms": latency_ms,
"host": MONGO_HOST,
}
except Exception as e:
return {"status": "error", "error": str(e), "host": MONGO_HOST}
@mcp.tool()
def distinct_values(db: str, collection: str, field: str, filter_json: Optional[Union[str, dict]] = None) -> dict:
"""Return distinct values of a field, optionally filtered. Useful for exploring enums and categories."""
try:
col = client[db][collection]
filt = parse_filter(filter_json)