Skip to content

Composability

mcp-datahub is designed for composition with other MCP tool libraries.

Island Architecture

Each txn2 library (mcp-trino, mcp-datahub, mcp-nifi, mcp-s3) is an island:

  • No knowledge of each other
  • No shared dependencies
  • Can be used independently or together
flowchart TB
    trino["mcp-trino<br/>(no imports)"]
    datahub["mcp-datahub<br/>(no imports)"]
    nifi["mcp-nifi<br/>(no imports)"]
    custom["Your Custom MCP<br/>Imports all, wires"]

    trino --> custom
    datahub --> custom
    nifi --> custom

Basic Composition

package main

import (
    "github.com/modelcontextprotocol/go-sdk/mcp"

    datahubclient "github.com/txn2/mcp-datahub/pkg/client"
    datahubtools "github.com/txn2/mcp-datahub/pkg/tools"

    trinoclient "github.com/txn2/mcp-trino/pkg/client"
    trinotools "github.com/txn2/mcp-trino/pkg/tools"
)

func main() {
    server := mcp.NewServer(&mcp.Implementation{
        Name:    "unified-data-server",
        Version: "1.0.0",
    }, nil)

    // DataHub tools
    datahub, _ := datahubclient.NewFromEnv()
    datahubtools.NewToolkit(datahub).RegisterAll(server)

    // Trino tools
    trino, _ := trinoclient.NewFromEnv()
    trinotools.NewToolkit(trino).RegisterAll(server)

    // Run combined server
    server.Run(ctx, &mcp.StdioTransport{})
}

Built-in Extensions

The extensions package provides ready-to-use middleware for common needs:

import "github.com/txn2/mcp-datahub/pkg/extensions"

// Enable via environment variables
cfg := extensions.FromEnv()
opts := extensions.BuildToolkitOptions(cfg)
toolkit := tools.NewToolkit(datahubClient, toolsCfg, opts...)
Extension Env Variable Description
Logging MCP_DATAHUB_EXT_LOGGING Structured logging of tool calls with duration
Metrics MCP_DATAHUB_EXT_METRICS Call counts, error counts, and timing
Error Hints MCP_DATAHUB_EXT_ERRORS Helpful hints appended to error messages
Metadata MCP_DATAHUB_EXT_METADATA Execution metadata on successful results

For custom middleware beyond what extensions provide, see below.

Adding Custom Middleware

Add cross-cutting concerns like logging or access control:

// Create toolkit with middleware
toolkit := tools.NewToolkit(datahubClient,
    tools.WithMiddleware(loggingMiddleware),
    tools.WithMiddleware(accessControlMiddleware),
)

// Logging middleware
func loggingMiddleware(next tools.Handler) tools.Handler {
    return func(ctx context.Context, req *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        log.Printf("Tool called: %s", req.Name)
        return next(ctx, req)
    }
}

Per-Tool Middleware

Apply middleware to specific tools:

toolkit := tools.NewToolkit(datahubClient,
    tools.WithToolMiddleware(tools.ToolSearch, rateLimiter),
    tools.WithToolMiddleware(tools.ToolGetLineage, cacheMiddleware),
)

Selective Registration

Register only the tools you need:

// DataHub: only search and entity tools
datahubToolkit.Register(server,
    tools.ToolSearch,
    tools.ToolGetEntity,
)

// Trino: only query tool
trinoToolkit.Register(server,
    trinotools.ToolQuery,
)

Bidirectional Integration

The library supports bidirectional context injection between toolkits. While mcp-trino can pull semantic context from DataHub, mcp-datahub can receive query execution context back from a query engine through the QueryProvider interface.

QueryProvider Interface

import "github.com/txn2/mcp-datahub/pkg/integration"

// QueryProvider enables query engines to inject context
type QueryProvider interface {
    Name() string
    ResolveTable(ctx context.Context, urn string) (*TableIdentifier, error)
    GetTableAvailability(ctx context.Context, urn string) (*TableAvailability, error)
    GetQueryExamples(ctx context.Context, urn string) ([]QueryExample, error)
    GetExecutionContext(ctx context.Context, urns []string) (*ExecutionContext, error)
    Close() error
}

Implementation Example

type trinoQueryProvider struct {
    client *trino.Client
    env    string
}

func (p *trinoQueryProvider) Name() string { return "trino" }

func (p *trinoQueryProvider) ResolveTable(ctx context.Context, urn string) (*integration.TableIdentifier, error) {
    // Parse URN: urn:li:dataset:(urn:li:dataPlatform:trino,catalog.schema.table,PROD)
    parts := parseDataHubURN(urn)
    return &integration.TableIdentifier{
        Catalog: parts.Catalog,
        Schema:  parts.Schema,
        Table:   parts.Table,
    }, nil
}

func (p *trinoQueryProvider) GetTableAvailability(ctx context.Context, urn string) (*integration.TableAvailability, error) {
    table, _ := p.ResolveTable(ctx, urn)
    // Check if table exists in Trino
    exists, err := p.client.TableExists(ctx, table.String())
    return &integration.TableAvailability{
        Available: exists,
        Table:     table,
    }, err
}

func (p *trinoQueryProvider) GetQueryExamples(ctx context.Context, urn string) ([]integration.QueryExample, error) {
    table, _ := p.ResolveTable(ctx, urn)
    return []integration.QueryExample{
        {
            Name:     "sample",
            SQL:      fmt.Sprintf("SELECT * FROM %s LIMIT 10", table.String()),
            Category: "sample",
        },
        {
            Name:     "count",
            SQL:      fmt.Sprintf("SELECT COUNT(*) FROM %s", table.String()),
            Category: "aggregation",
        },
    }, nil
}

func (p *trinoQueryProvider) GetExecutionContext(ctx context.Context, urns []string) (*integration.ExecutionContext, error) {
    tables := make(map[string]*integration.TableIdentifier)
    for _, urn := range urns {
        if table, err := p.ResolveTable(ctx, urn); err == nil {
            tables[urn] = table
        }
    }
    return &integration.ExecutionContext{
        Tables: tables,
        Source: "trino",
    }, nil
}

func (p *trinoQueryProvider) Close() error { return nil }

Wiring It Up

// Create provider
queryProvider := &trinoQueryProvider{client: trinoClient, env: "PROD"}

// Create toolkit with provider
toolkit := datahubtools.NewToolkit(datahubClient, config,
    datahubtools.WithQueryProvider(queryProvider),
)

Enriched Tool Responses

When a QueryProvider is configured, tool responses are automatically enriched:

Tool Enrichment
datahub_search query_context with table availability for each result
datahub_get_entity query_table, query_examples, query_availability
datahub_get_schema query_table for immediate SQL usage
datahub_get_lineage execution_context mapping URNs to queryable tables

Integration Interfaces

The library provides enterprise integration hooks through the integration package:

Access Control

import "github.com/txn2/mcp-datahub/pkg/integration"

type myAccessFilter struct {
    authService AuthService
}

func (f *myAccessFilter) CanAccess(ctx context.Context, urn string) (bool, error) {
    userID := ctx.Value("user_id").(string)
    return f.authService.CheckPermission(userID, urn, "read")
}

func (f *myAccessFilter) FilterURNs(ctx context.Context, urns []string) ([]string, error) {
    userID := ctx.Value("user_id").(string)
    return f.authService.FilterAllowed(userID, urns)
}

// Use it
toolkit := datahubtools.NewToolkit(client, config,
    datahubtools.WithAccessFilter(&myAccessFilter{authService}),
)

Audit Logging

type myAuditLogger struct {
    db *sql.DB
}

func (l *myAuditLogger) LogToolCall(ctx context.Context, tool string, params map[string]any, userID string) error {
    _, err := l.db.ExecContext(ctx,
        "INSERT INTO audit_log (tool, params, user_id, timestamp) VALUES (?, ?, ?, NOW())",
        tool, params, userID,
    )
    return err
}

// Use it
toolkit := datahubtools.NewToolkit(client, config,
    datahubtools.WithAuditLogger(&myAuditLogger{db}, func(ctx context.Context) string {
        return ctx.Value("user_id").(string)
    }),
)

URN Resolution

Map external IDs to DataHub URNs:

type myURNResolver struct {
    mappingDB *sql.DB
}

func (r *myURNResolver) ResolveToDataHubURN(ctx context.Context, externalID string) (string, error) {
    var urn string
    err := r.mappingDB.QueryRowContext(ctx,
        "SELECT datahub_urn FROM id_mapping WHERE external_id = ?",
        externalID,
    ).Scan(&urn)
    return urn, err
}

// Use it - allows tools to accept external IDs instead of URNs
toolkit := datahubtools.NewToolkit(client, config,
    datahubtools.WithURNResolver(&myURNResolver{mappingDB}),
)

Metadata Enrichment

Add custom metadata to entity responses:

type myEnricher struct {
    metadataService MetadataService
}

func (e *myEnricher) EnrichEntity(ctx context.Context, urn string, data map[string]any) (map[string]any, error) {
    // Add custom metadata
    custom, _ := e.metadataService.GetCustomMetadata(urn)
    data["custom_metadata"] = custom
    data["last_accessed"] = e.metadataService.GetLastAccess(urn)
    return data, nil
}

// Use it
toolkit := datahubtools.NewToolkit(client, config,
    datahubtools.WithMetadataEnricher(&myEnricher{metadataService}),
)

Combining All Integration Options

toolkit := datahubtools.NewToolkit(datahubClient, config,
    // Bidirectional query context
    datahubtools.WithQueryProvider(&trinoQueryProvider{trinoClient, "PROD"}),

    // Enterprise integrations
    datahubtools.WithURNResolver(&myURNResolver{}),
    datahubtools.WithAccessFilter(&myAccessFilter{}),
    datahubtools.WithAuditLogger(&myAuditLogger{}, getUserID),
    datahubtools.WithMetadataEnricher(&myEnricher{}),

    // Standard middleware
    datahubtools.WithMiddleware(loggingMiddleware),
)

Adding Custom Tools

Extend with your own domain-specific tools:

// Register DataHub tools
datahubtools.NewToolkit(datahub).RegisterAll(server)

// Add your custom tools
mcp.AddTool(server, &mcp.Tool{
    Name:        "company_data_dictionary",
    Description: "Get company-specific data dictionary",
}, yourHandler)

Error Handling in Middleware

Failing Fast vs Graceful Degradation

Choose the appropriate error handling strategy for your middleware:

Fail Fast: Return error immediately, abort tool execution

func (m *StrictAuthMiddleware) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    token := ctx.Value("auth_token")
    if token == nil {
        // Fail fast: no token means abort
        return ctx, errors.New("unauthorized: missing token")
    }
    return ctx, nil
}

Graceful Degradation: Log issue but continue execution

func (m *OptionalEnricherMiddleware) After(ctx context.Context, tc *tools.ToolContext, result *mcp.CallToolResult, err error) (*mcp.CallToolResult, error) {
    enriched, enrichErr := m.enricher.Enrich(ctx, result)
    if enrichErr != nil {
        // Log but don't fail - enrichment is optional
        log.Printf("Warning: enrichment failed: %v", enrichErr)
        return result, err // Return original result
    }
    return enriched, err
}

Error Wrapping Pattern

Wrap errors with context for better debugging:

func (m *MyMiddleware) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    result, err := m.doSomething(ctx)
    if err != nil {
        return ctx, fmt.Errorf("middleware %s failed for tool %s: %w", m.name, tc.Name, err)
    }
    return ctx, nil
}

Recovery Middleware

Add recovery middleware at the start of the chain to catch panics:

type RecoveryMiddleware struct{}

func (m *RecoveryMiddleware) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    return ctx, nil
}

func (m *RecoveryMiddleware) After(ctx context.Context, tc *tools.ToolContext, result *mcp.CallToolResult, err error) (*mcp.CallToolResult, error) {
    // Recovery happens here if panic occurred
    return result, err
}

// Register first so it wraps everything
toolkit := tools.NewToolkit(client,
    tools.WithMiddleware(&RecoveryMiddleware{}),
    tools.WithMiddleware(&AuthMiddleware{}),
    // ... other middleware
)

Testing Middleware

Unit Testing

Test middleware in isolation:

func TestAuthMiddleware_ValidToken(t *testing.T) {
    middleware := &AuthMiddleware{secret: "test-secret"}
    ctx := context.WithValue(context.Background(), "auth_token", validToken)
    tc := &tools.ToolContext{Name: "datahub_search"}

    newCtx, err := middleware.Before(ctx, tc)

    if err != nil {
        t.Errorf("Expected no error, got: %v", err)
    }
    if newCtx.Value("user_id") == nil {
        t.Error("Expected user_id in context")
    }
}

func TestAuthMiddleware_MissingToken(t *testing.T) {
    middleware := &AuthMiddleware{secret: "test-secret"}
    ctx := context.Background()
    tc := &tools.ToolContext{Name: "datahub_search"}

    _, err := middleware.Before(ctx, tc)

    if err == nil {
        t.Error("Expected error for missing token")
    }
}

Testing Middleware Chain

Test multiple middleware together:

func TestMiddlewareChain(t *testing.T) {
    var executionOrder []string

    middleware1 := tools.BeforeFunc(func(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
        executionOrder = append(executionOrder, "m1-before")
        return ctx, nil
    })

    middleware2 := tools.BeforeFunc(func(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
        executionOrder = append(executionOrder, "m2-before")
        return ctx, nil
    })

    toolkit := tools.NewToolkit(mockClient,
        tools.WithMiddleware(middleware1),
        tools.WithMiddleware(middleware2),
    )

    // Execute tool...

    expected := []string{"m1-before", "m2-before"}
    if !reflect.DeepEqual(executionOrder, expected) {
        t.Errorf("Expected %v, got %v", expected, executionOrder)
    }
}

Integration Testing with Mock Providers

Test with mock implementations:

type MockQueryProvider struct {
    ResolveTableFunc func(ctx context.Context, urn string) (*integration.TableIdentifier, error)
}

func (m *MockQueryProvider) Name() string { return "mock" }

func (m *MockQueryProvider) ResolveTable(ctx context.Context, urn string) (*integration.TableIdentifier, error) {
    if m.ResolveTableFunc != nil {
        return m.ResolveTableFunc(ctx, urn)
    }
    return nil, nil
}

// Use in tests
func TestWithQueryProvider(t *testing.T) {
    mockProvider := &MockQueryProvider{
        ResolveTableFunc: func(ctx context.Context, urn string) (*integration.TableIdentifier, error) {
            return &integration.TableIdentifier{
                Catalog: "test",
                Schema:  "schema",
                Table:   "table",
            }, nil
        },
    }

    toolkit := tools.NewToolkit(mockClient,
        tools.WithQueryProvider(mockProvider),
    )

    // Test tool execution...
}

Context Propagation

Pass data between middleware using context:

// Set in one middleware
func (m *AuthMiddleware) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    ctx = context.WithValue(ctx, "user_id", userID)
    ctx = context.WithValue(ctx, "user_roles", roles)
    ctx = context.WithValue(ctx, "request_id", uuid.New().String())
    return ctx, nil
}

// Use in another middleware
func (m *AuditMiddleware) After(ctx context.Context, tc *tools.ToolContext, result *mcp.CallToolResult, err error) (*mcp.CallToolResult, error) {
    userID := ctx.Value("user_id").(string)
    requestID := ctx.Value("request_id").(string)
    // Log with context...
    return result, err
}

Middleware Best Practices

Practice Description
Stateless Avoid storing state in middleware structs
Idempotent Multiple calls should have same effect
Fast Keep Before/After hooks lightweight
Logged Log errors before returning them
Tested Unit test each middleware independently
Ordered Document expected middleware order