Skip to content

Tools API Reference

Complete API reference for the tools package.

Toolkit

NewToolkit

Creates a new toolkit instance.

func NewToolkit(client DataHubClient, opts ...Option) *Toolkit

Parameters:

  • client: A DataHub client implementing the DataHubClient interface
  • opts: Optional configuration options

Example:

toolkit := tools.NewToolkit(datahubClient)

RegisterAll

Registers all available tools with the MCP server.

func (t *Toolkit) RegisterAll(server *mcp.Server)

Register

Registers specific tools with the MCP server.

func (t *Toolkit) Register(server *mcp.Server, names ...ToolName)

Example:

toolkit.Register(server, tools.ToolSearch, tools.ToolGetEntity)

RegisterWith

Registers a tool with per-tool options.

func (t *Toolkit) RegisterWith(server *mcp.Server, name ToolName, opts ...PerToolOption)

Options

WithMiddleware

Adds global middleware to all tools.

func WithMiddleware(mw ToolMiddleware) Option

WithToolMiddleware

Adds middleware to a specific tool.

func WithToolMiddleware(name ToolName, mw ToolMiddleware) Option

WithQueryProvider

Injects a query execution context provider for bidirectional integration.

func WithQueryProvider(p integration.QueryProvider) Option

When configured, enriches tool responses with query execution context (table resolution, availability, examples).

WithURNResolver

Maps external IDs to DataHub URNs before tool execution.

func WithURNResolver(r integration.URNResolver) Option

WithAccessFilter

Adds access control filtering before and after tool execution.

func WithAccessFilter(f integration.AccessFilter) Option

WithAuditLogger

Logs all tool invocations for audit purposes.

func WithAuditLogger(l integration.AuditLogger, getUserID func(context.Context) string) Option

WithMetadataEnricher

Adds custom metadata to entity responses.

func WithMetadataEnricher(e integration.MetadataEnricher) Option

WithDescriptions

Overrides tool descriptions at the toolkit level.

func WithDescriptions(descs map[ToolName]string) ToolkitOption

Example:

toolkit := tools.NewToolkit(datahubClient, config,
    tools.WithDescriptions(map[tools.ToolName]string{
        tools.ToolSearch: "Search our internal data catalog",
    }),
)

WithDescription

Overrides the description for a single tool at registration time.

func WithDescription(desc string) ToolOption

Example:

toolkit.RegisterWith(server, tools.ToolSearch,
    tools.WithDescription("Search our company's data catalog"),
)

Description Priority:

  1. Per-registration WithDescription() (highest)
  2. Toolkit-level WithDescriptions() map
  3. Built-in default description (lowest)

WithAnnotations

Overrides MCP tool annotations at the toolkit level.

func WithAnnotations(anns map[ToolName]*mcp.ToolAnnotations) ToolkitOption

Example:

toolkit := tools.NewToolkit(datahubClient, config,
    tools.WithAnnotations(map[tools.ToolName]*mcp.ToolAnnotations{
        tools.ToolSearch: {ReadOnlyHint: true, OpenWorldHint: boolPtr(true)},
    }),
)

WithAnnotation

Overrides the annotations for a single tool at registration time.

func WithAnnotation(ann *mcp.ToolAnnotations) ToolOption

Example:

toolkit.RegisterWith(server, tools.ToolSearch,
    tools.WithAnnotation(&mcp.ToolAnnotations{ReadOnlyHint: true}),
)

Annotation Priority:

  1. Per-registration WithAnnotation() (highest)
  2. Toolkit-level WithAnnotations() map
  3. Built-in default annotations (lowest)

DefaultAnnotations

Returns the default annotations for a tool by name. Returns nil for unknown tool names.

func DefaultAnnotations(name ToolName) *mcp.ToolAnnotations

Default Annotations:

Tool Category ReadOnlyHint DestructiveHint IdempotentHint OpenWorldHint
Read tools (12) true (default) true true
Write tools (7) false false true true

OpenWorldHint is true for all tools because every tool communicates with an external DataHub instance.

WithTitles

Overrides tool display names at the toolkit level. Titles appear in MCP clients (e.g., Claude Desktop) instead of raw tool names.

func WithTitles(titles map[ToolName]string) ToolkitOption

Example:

toolkit := tools.NewToolkit(datahubClient, config,
    tools.WithTitles(map[tools.ToolName]string{
        tools.ToolSearch: "Search Our Catalog",
    }),
)

WithTitle

Overrides the display title for a single tool at registration time.

func WithTitle(title string) ToolOption

Example:

toolkit.RegisterWith(server, tools.ToolSearch,
    tools.WithTitle("Search Our Catalog"),
)

Title Priority:

  1. Per-registration WithTitle() (highest)
  2. Toolkit-level WithTitles() map
  3. Built-in default title (lowest)

DefaultTitle

Returns the default display title for a tool by name. Returns empty string for unknown tool names.

func DefaultTitle(name ToolName) string

WithOutputSchemas

Overrides the output JSON Schema for multiple tools at the toolkit level.

func WithOutputSchemas(schemas map[ToolName]any) ToolkitOption

WithOutputSchema

Overrides the output JSON Schema for a single tool at registration time.

func WithOutputSchema(schema any) ToolOption

OutputSchema Priority:

  1. Per-registration WithOutputSchema() (highest)
  2. Toolkit-level WithOutputSchemas() map
  3. Built-in default output schema (lowest)

DefaultOutputSchema

Returns the default output JSON Schema for a tool by name. Returns nil for unknown tool names.

func DefaultOutputSchema(name ToolName) json.RawMessage

Tool Names

Available tool name constants:

const (
    ToolSearch           ToolName = "datahub_search"
    ToolGetEntity        ToolName = "datahub_get_entity"
    ToolGetSchema        ToolName = "datahub_get_schema"
    ToolGetLineage       ToolName = "datahub_get_lineage"
    ToolGetColumnLineage ToolName = "datahub_get_column_lineage"
    ToolGetQueries       ToolName = "datahub_get_queries"
    ToolGetGlossaryTerm  ToolName = "datahub_get_glossary_term"
    ToolListTags         ToolName = "datahub_list_tags"
    ToolListDomains      ToolName = "datahub_list_domains"
    ToolListDataProducts ToolName = "datahub_list_data_products"
    ToolGetDataProduct   ToolName = "datahub_get_data_product"
    ToolListConnections  ToolName = "datahub_list_connections"

    // Write tools (require WriteEnabled: true)
    ToolUpdateDescription  ToolName = "datahub_update_description"
    ToolAddTag             ToolName = "datahub_add_tag"
    ToolRemoveTag          ToolName = "datahub_remove_tag"
    ToolAddGlossaryTerm    ToolName = "datahub_add_glossary_term"
    ToolRemoveGlossaryTerm ToolName = "datahub_remove_glossary_term"
    ToolAddLink            ToolName = "datahub_add_link"
    ToolRemoveLink         ToolName = "datahub_remove_link"
)

Middleware

ToolMiddleware Interface

type ToolMiddleware interface {
    Before(ctx context.Context, tc *ToolContext) (context.Context, error)
    After(ctx context.Context, tc *ToolContext, result *mcp.CallToolResult, err error) (*mcp.CallToolResult, error)
}

BeforeFunc

Creates middleware that runs before tool execution.

func BeforeFunc(fn func(ctx context.Context, tc *ToolContext) (context.Context, error)) ToolMiddleware

AfterFunc

Creates middleware that runs after tool execution.

func AfterFunc(fn func(ctx context.Context, tc *ToolContext, result *mcp.CallToolResult, err error) (*mcp.CallToolResult, error)) ToolMiddleware

ToolContext

GetString

Convenience method to get a string value from the Extra map.

func (tc *ToolContext) GetString(key string) string

Returns the value as a string, or empty string if not found or not a string type.

Example:

func (m *MyMiddleware) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    connection := tc.GetString("connection")
    if connection == "" {
        connection = "(default)"
    }
    log.Printf("tool=%s connection=%s", tc.ToolName, connection)
    return ctx, nil
}

Helper Functions

TextResult

Creates a text result.

func TextResult(text string) *mcp.CallToolResult

JSONResult

Creates a JSON result.

func JSONResult(v any) (*mcp.CallToolResult, error)

ErrorResult

Creates an error result.

func ErrorResult(msg string) *mcp.CallToolResult

Write Tool Output Types

Write tools return typed output structs as the second return value from their handler functions. These provide structured access to operation results.

UpdateDescriptionOutput

type UpdateDescriptionOutput struct {
    URN    string `json:"urn"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

AddTagOutput / RemoveTagOutput

type AddTagOutput struct {
    URN    string `json:"urn"`
    Tag    string `json:"tag"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

type RemoveTagOutput struct {
    URN    string `json:"urn"`
    Tag    string `json:"tag"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

AddGlossaryTermOutput / RemoveGlossaryTermOutput

type AddGlossaryTermOutput struct {
    URN    string `json:"urn"`
    Term   string `json:"term"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

type RemoveGlossaryTermOutput struct {
    URN    string `json:"urn"`
    Term   string `json:"term"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

AddLinkOutput / RemoveLinkOutput

type AddLinkOutput struct {
    URN    string `json:"urn"`
    URL    string `json:"url"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

type RemoveLinkOutput struct {
    URN    string `json:"urn"`
    URL    string `json:"url"`
    Aspect string `json:"aspect"`
    Action string `json:"action"`
}

Integration Package

The integration package provides interfaces for enterprise integration.

QueryProvider Interface

Enables query engines to inject execution context into DataHub tools.

type QueryProvider interface {
    Name() string
    ResolveTable(ctx context.Context, urn string) (*TableIdentifier, error)
    GetTableAvailability(ctx context.Context, urn string) (*TableAvailability, error)
    GetQueryExamples(ctx context.Context, urn string) ([]QueryExample, error)
    GetExecutionContext(ctx context.Context, urns []string) (*ExecutionContext, error)
    Close() error
}

TableIdentifier

Represents a fully-qualified table reference.

type TableIdentifier struct {
    Connection string `json:"connection,omitempty"`  // Named connection (optional)
    Catalog    string `json:"catalog"`               // Catalog/database name
    Schema     string `json:"schema"`                // Schema name
    Table      string `json:"table"`                 // Table name
}

func (t TableIdentifier) String() string  // Returns "catalog.schema.table" or "conn:catalog.schema.table"

TableAvailability

Indicates whether a DataHub entity is queryable.

type TableAvailability struct {
    Available   bool             `json:"available"`
    Table       *TableIdentifier `json:"table,omitempty"`
    Connection  string           `json:"connection,omitempty"`
    Error       string           `json:"error,omitempty"`
    LastChecked time.Time        `json:"last_checked,omitempty"`
    RowCount    *int64           `json:"row_count,omitempty"`
    LastUpdated *time.Time       `json:"last_updated,omitempty"`
}

QueryExample

Represents a sample SQL query for a DataHub entity.

type QueryExample struct {
    Name        string `json:"name"`
    Description string `json:"description,omitempty"`
    SQL         string `json:"sql"`
    Category    string `json:"category,omitempty"`   // "sample", "aggregation", "join", etc.
    Source      string `json:"source,omitempty"`     // "generated", "history", "template"
}

ExecutionContext

Provides query execution context for lineage bridging.

type ExecutionContext struct {
    Tables      map[string]*TableIdentifier `json:"tables,omitempty"`
    Connections []string                    `json:"connections,omitempty"`
    Queries     []ExecutionQuery            `json:"queries,omitempty"`
    Source      string                      `json:"source,omitempty"`
}

URNResolver Interface

Maps external IDs to DataHub URNs.

type URNResolver interface {
    ResolveToDataHubURN(ctx context.Context, externalID string) (string, error)
}

AccessFilter Interface

Controls access to entities.

type AccessFilter interface {
    CanAccess(ctx context.Context, urn string) (bool, error)
    FilterURNs(ctx context.Context, urns []string) ([]string, error)
}

AuditLogger Interface

Logs tool invocations.

type AuditLogger interface {
    LogToolCall(ctx context.Context, tool string, params map[string]any, userID string) error
}

MetadataEnricher Interface

Adds custom metadata to entity responses.

type MetadataEnricher interface {
    EnrichEntity(ctx context.Context, urn string, data map[string]any) (map[string]any, error)
}

NoOpQueryProvider

A default no-op implementation of QueryProvider.

var _ QueryProvider = (*NoOpQueryProvider)(nil)

type NoOpQueryProvider struct{}

QueryProviderFunc

Function-based QueryProvider implementation for simple cases.

type QueryProviderFunc struct {
    NameFn                 func() string
    ResolveTableFn         func(context.Context, string) (*TableIdentifier, error)
    GetTableAvailabilityFn func(context.Context, string) (*TableAvailability, error)
    GetQueryExamplesFn     func(context.Context, string) ([]QueryExample, error)
    GetExecutionContextFn  func(context.Context, []string) (*ExecutionContext, error)
    CloseFn                func() error
}

Client Package

The client package provides the DataHub GraphQL client.

NewFromEnv

Creates a client from environment variables.

func NewFromEnv() (*Client, error)

Environment Variables:

Variable Required Description
DATAHUB_URL Yes DataHub GraphQL endpoint URL
DATAHUB_TOKEN Yes Authentication token
DATAHUB_CONNECTION_NAME No Named connection identifier

New

Creates a client with explicit configuration.

func New(cfg Config) (*Client, error)

Config Fields:

type Config struct {
    URL              string        // DataHub GraphQL endpoint
    Token            string        // Authentication token
    ConnectionName   string        // Optional connection name
    Timeout          time.Duration // Request timeout (default: 30s)
    MaxRetries       int           // Max retry attempts (default: 3)
    RetryBackoff     time.Duration // Initial backoff duration (default: 1s)
}

Client Methods

Method Description
Search(ctx, query, entityType, limit, offset) Search for entities
GetEntity(ctx, urn) Get entity by URN
GetSchema(ctx, urn) Get dataset schema
GetSchemas(ctx, urns) Get multiple dataset schemas (batch)
GetLineage(ctx, urn, direction, depth) Get entity lineage
GetColumnLineage(ctx, urn) Get column-level lineage mappings
GetQueries(ctx, urn) Get associated queries
GetGlossaryTerm(ctx, urn) Get glossary term details
ListTags(ctx, filter) List tags
ListDomains(ctx) List domains
ListDataProducts(ctx) List data products
GetDataProduct(ctx, urn) Get data product details
Close() Close the client

Types Package

The types package contains domain types returned by the client and tools.

Entity

Represents a DataHub entity.

type Entity struct {
    URN         string            `json:"urn"`
    Type        string            `json:"type"`
    Name        string            `json:"name"`
    Platform    string            `json:"platform,omitempty"`
    Description string            `json:"description,omitempty"`
    Owners      []Owner           `json:"owners,omitempty"`
    Tags        []Tag             `json:"tags,omitempty"`
    Terms       []GlossaryTerm    `json:"glossaryTerms,omitempty"`
    Domain      *Domain           `json:"domain,omitempty"`
    Properties  map[string]any    `json:"properties,omitempty"`
}

SchemaField

Represents a field in a dataset schema.

type SchemaField struct {
    FieldPath     string         `json:"fieldPath"`
    Type          string         `json:"type"`
    NativeType    string         `json:"nativeType,omitempty"`
    Description   string         `json:"description,omitempty"`
    Nullable      bool           `json:"nullable"`
    IsPrimaryKey  bool           `json:"isPrimaryKey,omitempty"`
    GlossaryTerms []GlossaryTerm `json:"glossaryTerms,omitempty"`
}

LineageResult

Represents lineage query results.

type LineageResult struct {
    URN        string          `json:"urn"`
    Upstream   []LineageEntity `json:"upstream,omitempty"`
    Downstream []LineageEntity `json:"downstream,omitempty"`
}

type LineageEntity struct {
    URN      string `json:"urn"`
    Name     string `json:"name"`
    Type     string `json:"type"`
    Platform string `json:"platform,omitempty"`
    Degree   int    `json:"degree"`
}

Owner

Represents an entity owner.

type Owner struct {
    URN  string `json:"urn"`
    Name string `json:"name"`
    Type string `json:"type"`
}

Tag

Represents a tag.

type Tag struct {
    URN         string `json:"urn"`
    Name        string `json:"name"`
    Description string `json:"description,omitempty"`
}

Domain

Represents a data domain.

type Domain struct {
    URN         string `json:"urn"`
    Name        string `json:"name"`
    Description string `json:"description,omitempty"`
    EntityCount int    `json:"entityCount,omitempty"`
}

DataProduct

Represents a data product.

type DataProduct struct {
    URN         string            `json:"urn"`
    Name        string            `json:"name"`
    Description string            `json:"description,omitempty"`
    Domain      *Domain           `json:"domain,omitempty"`
    Owners      []Owner           `json:"owners,omitempty"`
    Assets      []Entity          `json:"assets,omitempty"`
    Properties  map[string]any    `json:"properties,omitempty"`
}

Thread Safety

All components in mcp-datahub are designed for concurrent use:

Client Thread Safety

The Client is safe for concurrent use by multiple goroutines:

  • Uses connection pooling with proper synchronization
  • HTTP client is shared across requests
  • No shared mutable state between requests
// Safe: multiple goroutines using same client
client, _ := datahubclient.NewFromEnv()
defer client.Close()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        result, _ := client.Search(ctx, "customer", "", 10, 0)
        // Process result
    }()
}
wg.Wait()

Toolkit Thread Safety

The Toolkit handles concurrent tool calls:

  • Tool handlers are stateless
  • Middleware must be either stateless or properly synchronized
  • Per-request state passed through context
// Safe: concurrent tool registration and execution
toolkit := tools.NewToolkit(client)
toolkit.RegisterAll(server)
// Server handles concurrent requests automatically

Middleware Thread Safety Requirements

When implementing custom middleware:

Guideline Description
Avoid shared state Do not store request-specific data in middleware structs
Use context Pass request-scoped data via context.Context
Synchronize if needed Use sync.Mutex for shared counters or caches
Prefer immutable Design middleware to be stateless when possible
// Thread-safe rate limiter example
type RateLimiter struct {
    mu       sync.Mutex
    requests map[string]int
}

func (r *RateLimiter) Before(ctx context.Context, tc *tools.ToolContext) (context.Context, error) {
    r.mu.Lock()
    defer r.mu.Unlock()
    // Safe access to shared state
    return ctx, nil
}

Performance Characteristics

Request Latency

Typical latency ranges for tool operations:

Tool Typical Latency Factors
datahub_search 50-200ms Query complexity, result count
datahub_get_entity 20-100ms Entity type, aspect count
datahub_get_schema 30-150ms Field count
datahub_get_lineage 100-500ms Depth, graph size
datahub_list_* 50-200ms Result count

Connection Pooling

The client uses HTTP connection pooling:

// Default transport settings
Transport: &http.Transport{
    MaxIdleConns:        100,
    MaxIdleConnsPerHost: 10,
    IdleConnTimeout:     90 * time.Second,
}

Retry Behavior

Failed requests are retried with exponential backoff:

Attempt Delay
1 Immediate
2 1 second
3 2 seconds
4 4 seconds

Retries only occur for:

  • Network timeouts
  • HTTP 500, 502, 503, 504 errors
  • Connection refused errors

Not retried:

  • HTTP 400, 401, 403, 404 errors
  • Context cancellation
  • Invalid request errors

Memory Considerations

Response Size Limits

The client limits response sizes to prevent memory issues:

Limit Default Description
Max response body 10MB Maximum GraphQL response size
Max entities 1000 Maximum search results per request
Max lineage depth 5 Maximum traversal depth

Streaming Large Results

For large result sets, use pagination:

// Paginate search results
offset := 0
limit := 100
for {
    result, err := toolkit.Search(ctx, SearchInput{
        Query:  "customer",
        Limit:  limit,
        Offset: offset,
    })
    if err != nil {
        break
    }
    if len(result.Entities) == 0 {
        break
    }
    // Process batch
    offset += limit
}

Memory-Efficient Patterns

Pattern Description
Process in batches Use pagination for large result sets
Close clients Call Close() when done to release resources
Limit lineage depth Use depth=2 or 3 for most use cases
Filter by entity type Reduce result count with type filters

Error Handling

Error Types

The library uses typed errors for specific conditions:

import "github.com/txn2/mcp-datahub/pkg/client"

// Check error types
switch {
case errors.Is(err, client.ErrUnauthorized):
    // Handle auth error
case errors.Is(err, client.ErrNotFound):
    // Handle not found
case errors.Is(err, client.ErrRateLimited):
    // Handle rate limiting
case errors.Is(err, client.ErrTimeout):
    // Handle timeout
default:
    // Handle other errors
}

Error Wrapping

All errors include context for debugging:

// Errors include operation context
// Example: "search failed: graphql error: unauthorized"

Tool Error Responses

Tools return structured error responses:

// Error response format
{
    "error": true,
    "message": "Entity not found: urn:li:dataset:...",
    "code": "NOT_FOUND"
}

Context Usage

Standard Context Values

The library recognizes these context values:

Key Type Description
auth_token string Authentication token for requests
user_id string User identifier for audit logging
tenant_id string Tenant identifier for multi-tenancy
request_id string Request correlation ID

Setting Context Values

ctx := context.Background()
ctx = context.WithValue(ctx, "user_id", "[email protected]")
ctx = context.WithValue(ctx, "request_id", uuid.New().String())

result, err := toolkit.Search(ctx, input)

Context Cancellation

All operations respect context cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

result, err := client.Search(ctx, "query", "", 10, 0)
if errors.Is(err, context.DeadlineExceeded) {
    // Handle timeout
}

Extensions Package

The extensions package provides built-in middleware and config file support.

Config

import "github.com/txn2/mcp-datahub/pkg/extensions"

type Config struct {
    EnableLogging   bool
    EnableMetrics   bool
    EnableMetadata  bool
    EnableErrorHelp bool
    LogOutput       io.Writer
}

DefaultConfig

Returns default configuration (error hints enabled, everything else off):

func DefaultConfig() Config

FromEnv

Loads extension configuration from MCP_DATAHUB_EXT_* environment variables:

func FromEnv() Config

BuildToolkitOptions

Converts extension config into toolkit options:

func BuildToolkitOptions(cfg Config) []tools.ToolkitOption

LoadConfig

Loads a full server config from a YAML or JSON file with environment variable overrides:

func LoadConfig(path string) (ServerConfig, error)

Built-in Middleware

Middleware Description
LoggingMiddleware Logs tool invocations and results with duration
MetricsMiddleware Collects call counts, error counts, and duration
ErrorHintMiddleware Appends helpful hints to error messages
MetadataMiddleware Appends execution metadata to successful results

MetricsCollector Interface

type MetricsCollector interface {
    RecordCall(toolName string, duration time.Duration, success bool)
}

InMemoryCollector is provided as a thread-safe in-memory implementation.