Skip to content

Client Transports

Learn about transport-specific client implementations and how to choose the right transport for your use case.

Transport Overview

MCP-Go provides client implementations for all supported transports. Each transport has different characteristics and is optimized for specific scenarios.

TransportBest ForConnectionReal-timeMulti-client
STDIOCLI tools, desktop appsProcess pipesNoNo
StreamableHTTPWeb services, APIsHTTP requestsNoYes
SSEWeb apps, real-timeHTTP + EventSourceYesYes
In-ProcessTesting, embeddedDirect callsYesNo

STDIO Client

STDIO clients communicate with servers through standard input/output, typically by spawning a subprocess.

Basic STDIO Client

package main
 
import (
    "context"
    "crypto/tls"
    "errors"
    "fmt"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
 
    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)
 
func createStdioClient() {
    // Create client that spawns a subprocess
    c, err := client.NewStdioMCPClient(
        "go", []string{}, "run", "/path/to/server/main.go",
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()
 
    ctx := context.Background()
 
    // Initialize connection
    if err := c.Initialize(ctx, initRequest); err != nil {
        log.Fatal(err)
    }
 
    // Use the client
    tools, err := c.ListTools(ctx, listToolsRequest)
    if err != nil {
        log.Fatal(err)
    }
 
    log.Printf("Available tools: %d", len(tools.Tools))
}

STDIO Error Handling

// Define error constants for STDIO client errors
var (
    ErrProcessExited  = errors.New("process exited")
    ErrProcessTimeout = errors.New("process timeout")
    ErrBrokenPipe     = errors.New("broken pipe")
)
 
func handleStdioErrors(c *client.StdioClient) {
    ctx := context.Background()
 
    result, err := c.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name: "example_tool",
        },
    })
 
    if err != nil {
        switch {
        case errors.Is(err, ErrProcessExited):
            log.Println("Server process exited unexpectedly")
            // Attempt to restart
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }
 
        case errors.Is(err, ErrProcessTimeout):
            log.Println("Server process timed out")
            // Kill and restart process
            c.Kill()
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }
 
        case errors.Is(err, ErrBrokenPipe):
            log.Println("Communication pipe broken")
            // Process likely crashed, restart
            if restartErr := c.Restart(); restartErr != nil {
                log.Printf("Failed to restart: %v", restartErr)
            }
 
        default:
            log.Printf("Unexpected error: %v", err)
        }
        return
    }
 
    log.Printf("Tool result: %+v", result)
}

STDIO Process Management

type ManagedStdioClient struct {
    client      *client.StdioClient
    options     client.StdioOptions
    restartChan chan struct{}
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}
 
func NewManagedStdioClient(options client.StdioOptions) (*ManagedStdioClient, error) {
    ctx, cancel := context.WithCancel(context.Background())
 
    msc := &ManagedStdioClient{
        options:     options,
        restartChan: make(chan struct{}, 1),
        ctx:         ctx,
        cancel:      cancel,
    }
 
    if err := msc.start(); err != nil {
        cancel()
        return nil, err
    }
 
    msc.wg.Add(1)
    go msc.monitorProcess()
 
    return msc, nil
}
 
func (msc *ManagedStdioClient) start() error {
    client, err := client.NewStdioClientWithOptions(msc.options)
    if err != nil {
        return err
    }
 
    if err := client.Initialize(msc.ctx); err != nil {
        client.Close()
        return err
    }
 
    msc.client = client
    return nil
}
 
func (msc *ManagedStdioClient) monitorProcess() {
    defer msc.wg.Done()
 
    for {
        select {
        case <-msc.ctx.Done():
            return
        case <-msc.restartChan:
            log.Println("Restarting STDIO client...")
            
            if msc.client != nil {
                msc.client.Close()
            }
 
            // Wait before restarting
            time.Sleep(1 * time.Second)
 
            if err := msc.start(); err != nil {
                log.Printf("Failed to restart client: %v", err)
                // Try again after delay
                time.Sleep(5 * time.Second)
                select {
                case msc.restartChan <- struct{}{}:
                default:
                }
            } else {
                log.Println("Client restarted successfully")
            }
        }
    }
}
 
func (msc *ManagedStdioClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    if msc.client == nil {
        return nil, fmt.Errorf("client not available")
    }
 
    result, err := msc.client.CallTool(ctx, req)
    if err != nil && isProcessError(err) {
        // Trigger restart
        select {
        case msc.restartChan <- struct{}{}:
        default:
        }
        return nil, fmt.Errorf("process error, restarting: %w", err)
    }
 
    return result, err
}
 
func (msc *ManagedStdioClient) Close() error {
    msc.cancel()
    msc.wg.Wait()
    
    if msc.client != nil {
        return msc.client.Close()
    }
    
    return nil
}
 
func isProcessError(err error) bool {
    return errors.Is(err, ErrProcessExited) ||
           errors.Is(err, ErrBrokenPipe) ||
           errors.Is(err, ErrProcessTimeout)
}
// Define connection error constants
var (
    ErrConnectionLost   = errors.New("connection lost")
    ErrConnectionFailed = errors.New("connection failed")
    ErrUnauthorized     = errors.New("unauthorized")
    ErrForbidden        = errors.New("forbidden")
)

StreamableHTTP Client

StreamableHTTP clients communicate with servers using traditional HTTP requests.

Basic StreamableHTTP Client

func createStreamableHTTPClient() {
    // Create StreamableHTTP client
    c := client.NewStreamableHttpClient("http://localhost:8080/mcp")
    defer c.Close()
 
    ctx := context.Background()
 
    // Initialize
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }
 
    // Use client
    tools, err := c.ListTools(ctx)
    if err != nil {
        log.Fatal(err)
    }
 
    log.Printf("Available tools: %d", len(tools.Tools))
}

StreamableHTTP Client with Custom Configuration

func createCustomStreamableHTTPClient() {
    // Create StreamableHTTP client with options
    c := client.NewStreamableHttpClient("https://api.example.com/mcp",
        transport.WithHTTPTimeout(30*time.Second),
        transport.WithHTTPHeaders(map[string]string{
            "User-Agent": "MyApp/1.0",
            "Accept":     "application/json",
        }),
        transport.WithHTTPBasicClient(&http.Client{
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
                TLSClientConfig: &tls.Config{
                    InsecureSkipVerify: false,
                },
            },
        }),
    )
    defer c.Close()
 
    ctx := context.Background()
 
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }
 
    // Use client...
}

StreamableHTTP Authentication

func createAuthenticatedStreamableHTTPClient() {
    // Create StreamableHTTP client with OAuth
    c := client.NewStreamableHttpClient("http://localhost:8080/mcp",
        transport.WithHTTPOAuth(transport.OAuthConfig{
            ClientID:     "your-client-id",
            ClientSecret: "your-client-secret",
            TokenURL:     "https://auth.example.com/token",
            Scopes:       []string{"mcp:read", "mcp:write"},
        }),
    )
    defer c.Close()
 
    ctx := context.Background()
 
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }
 
    // Use client...
}
 
func isAuthError(err error) bool {
    return errors.Is(err, ErrUnauthorized) ||
           errors.Is(err, ErrForbidden)
}

StreamableHTTP Connection Pooling

type StreamableHTTPClientPool struct {
    clients chan *client.Client
    factory func() *client.Client
    maxSize int
}
 
func NewStreamableHTTPClientPool(baseURL string, maxSize int) *StreamableHTTPClientPool {
    pool := &StreamableHTTPClientPool{
        clients: make(chan *client.Client, maxSize),
        maxSize: maxSize,
        factory: func() *client.Client {
            return client.NewStreamableHttpClient(baseURL)
        },
    }
 
    // Pre-populate pool
    for i := 0; i < maxSize; i++ {
        pool.clients <- pool.factory()
    }
 
    return pool
}
 
func (pool *StreamableHTTPClientPool) Get() *client.Client {
    select {
    case c := <-pool.clients:
        return c
    default:
        return pool.factory()
    }
}
 
func (pool *StreamableHTTPClientPool) Put(c *client.Client) {
    select {
    case pool.clients <- c:
    default:
        // Pool full, close client
        c.Close()
    }
}
 
func (pool *StreamableHTTPClientPool) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    c := pool.Get()
    defer pool.Put(c)
 
    return c.CallTool(ctx, req)
}

SSE Client

SSE (Server-Sent Events) clients provide real-time communication with servers.

Basic SSE Client

func createSSEClient() {
    // Create SSE client
    c := client.NewSSEClient("http://localhost:8080/mcp/sse")
    defer c.Close()
 
    // Set authentication
    c.SetHeader("Authorization", "Bearer your-token")
 
    ctx := context.Background()
 
    // Initialize
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }
 
    // Subscribe to notifications
    notifications, err := c.Subscribe(ctx)
    if err != nil {
        log.Fatal(err)
    }
 
    // Handle notifications in background
    go func() {
        for notification := range notifications {
            log.Printf("Notification: %+v", notification)
        }
    }()
 
    // Use client for regular operations
    tools, err := c.ListTools(ctx)
    if err != nil {
        log.Fatal(err)
    }
 
    log.Printf("Available tools: %d", len(tools.Tools))
}

SSE Client with Reconnection

type ResilientSSEClient struct {
    baseURL     string
    headers     map[string]string
    client      *client.SSEClient
    ctx         context.Context
    cancel      context.CancelFunc
    reconnectCh chan struct{}
    mutex       sync.RWMutex
}
 
func NewResilientSSEClient(baseURL string) *ResilientSSEClient {
    ctx, cancel := context.WithCancel(context.Background())
 
    rsc := &ResilientSSEClient{
        baseURL:     baseURL,
        headers:     make(map[string]string),
        ctx:         ctx,
        cancel:      cancel,
        reconnectCh: make(chan struct{}, 1),
    }
 
    go rsc.reconnectLoop()
    return rsc
}
 
func (rsc *ResilientSSEClient) SetHeader(key, value string) {
    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()
    rsc.headers[key] = value
}
 
func (rsc *ResilientSSEClient) connect() error {
    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()
 
    if rsc.client != nil {
        rsc.client.Close()
    }
 
    client := client.NewSSEClient(rsc.baseURL)
    
    // Set headers
    for key, value := range rsc.headers {
        client.SetHeader(key, value)
    }
 
    if err := client.Initialize(rsc.ctx); err != nil {
        return err
    }
 
    rsc.client = client
    return nil
}
 
func (rsc *ResilientSSEClient) reconnectLoop() {
    for {
        select {
        case <-rsc.ctx.Done():
            return
        case <-rsc.reconnectCh:
            log.Println("Reconnecting SSE client...")
            
            for attempt := 1; attempt <= 5; attempt++ {
                if err := rsc.connect(); err != nil {
                    log.Printf("Reconnection attempt %d failed: %v", attempt, err)
                    
                    backoff := time.Duration(attempt) * time.Second
                    select {
                    case <-time.After(backoff):
                    case <-rsc.ctx.Done():
                        return
                    }
                } else {
                    log.Println("Reconnected successfully")
                    break
                }
            }
        }
    }
}
 
func (rsc *ResilientSSEClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    rsc.mutex.RLock()
    client := rsc.client
    rsc.mutex.RUnlock()
 
    if client == nil {
        return nil, fmt.Errorf("client not connected")
    }
 
    result, err := client.CallTool(ctx, req)
    if err != nil && isConnectionError(err) {
        // Trigger reconnection
        select {
        case rsc.reconnectCh <- struct{}{}:
        default:
        }
        return nil, fmt.Errorf("connection error: %w", err)
    }
 
    return result, err
}
 
func (rsc *ResilientSSEClient) Subscribe(ctx context.Context) (<-chan mcp.Notification, error) {
    rsc.mutex.RLock()
    client := rsc.client
    rsc.mutex.RUnlock()
 
    if client == nil {
        return nil, fmt.Errorf("client not connected")
    }
 
    return client.Subscribe(ctx)
}
 
func (rsc *ResilientSSEClient) Close() error {
    rsc.cancel()
    
    rsc.mutex.Lock()
    defer rsc.mutex.Unlock()
    
    if rsc.client != nil {
        return rsc.client.Close()
    }
    
    return nil
}
 
// Helper function to check if an error is a connection error
func isConnectionError(err error) bool {
    return errors.Is(err, ErrConnectionLost) ||
           errors.Is(err, ErrConnectionFailed)
}

SSE Event Handling

type SSEEventHandler struct {
    client      *client.SSEClient
    handlers    map[string][]func(mcp.Notification)
    mutex       sync.RWMutex
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}
 
func NewSSEEventHandler(c *client.SSEClient) *SSEEventHandler {
    ctx, cancel := context.WithCancel(context.Background())
 
    return &SSEEventHandler{
        client:   c,
        handlers: make(map[string][]func(mcp.Notification)),
        ctx:      ctx,
        cancel:   cancel,
    }
}
 
func (seh *SSEEventHandler) Start() error {
    notifications, err := seh.client.Subscribe(seh.ctx)
    if err != nil {
        return err
    }
 
    seh.wg.Add(1)
    go func() {
        defer seh.wg.Done()
        
        for {
            select {
            case notification := <-notifications:
                seh.handleNotification(notification)
            case <-seh.ctx.Done():
                return
            }
        }
    }()
 
    return nil
}
 
func (seh *SSEEventHandler) Stop() {
    seh.cancel()
    seh.wg.Wait()
}
 
func (seh *SSEEventHandler) OnProgress(handler func(mcp.Notification)) {
    seh.addHandler("notifications/progress", handler)
}
 
func (seh *SSEEventHandler) OnMessage(handler func(mcp.Notification)) {
    seh.addHandler("notifications/message", handler)
}
 
func (seh *SSEEventHandler) OnResourceUpdate(handler func(mcp.Notification)) {
    seh.addHandler("notifications/resources/updated", handler)
}
 
func (seh *SSEEventHandler) OnToolUpdate(handler func(mcp.Notification)) {
    seh.addHandler("notifications/tools/updated", handler)
}
 
func (seh *SSEEventHandler) addHandler(method string, handler func(mcp.Notification)) {
    seh.mutex.Lock()
    defer seh.mutex.Unlock()
    
    seh.handlers[method] = append(seh.handlers[method], handler)
}
 
func (seh *SSEEventHandler) handleNotification(notification mcp.Notification) {
    seh.mutex.RLock()
    handlers := seh.handlers[notification.Method]
    seh.mutex.RUnlock()
 
    for _, handler := range handlers {
        go handler(notification)
    }
}

In-Process Client

In-process clients provide direct communication with servers in the same process.

Basic In-Process Client

func createInProcessClient() {
    // Create server
    s := server.NewMCPServer("Test Server", "1.0.0")
    
    // Add tools to server
    s.AddTool(
        mcp.NewTool("test_tool",
            mcp.WithDescription("Test tool"),
            mcp.WithString("input", mcp.Required()),
        ),
        func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            input := req.Params.Arguments["input"].(string)
            return mcp.NewToolResultText("Processed: " + input), nil
        },
    )
 
    // Create in-process client
    c := client.NewInProcessClient(s)
    defer c.Close()
 
    ctx := context.Background()
 
    // Initialize (no network overhead)
    if err := c.Initialize(ctx); err != nil {
        log.Fatal(err)
    }
 
    // Use client
    result, err := c.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name: "test_tool",
            Arguments: map[string]interface{}{
                "input": "test data",
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }
 
    log.Printf("Tool result: %+v", result)
}

In-Process Client for Testing

type TestClient struct {
    server *server.MCPServer
    client *client.InProcessClient
}
 
func NewTestClient() *TestClient {
    s := server.NewMCPServer("Test Server", "1.0.0",
        server.WithAllCapabilities(),
    )
 
    return &TestClient{
        server: s,
        client: client.NewInProcessClient(s),
    }
}
 
func (tc *TestClient) AddTool(name, description string, handler server.ToolHandler) {
    tool := mcp.NewTool(name, mcp.WithDescription(description))
    tc.server.AddTool(tool, handler)
}
 
func (tc *TestClient) AddResource(uri, name string, handler server.ResourceHandler) {
    resource := mcp.NewResource(uri, name)
    tc.server.AddResource(resource, handler)
}
 
func (tc *TestClient) Initialize(ctx context.Context) error {
    return tc.client.Initialize(ctx)
}
 
func (tc *TestClient) CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error) {
    return tc.client.CallTool(ctx, mcp.CallToolRequest{
        Params: mcp.CallToolRequestParams{
            Name:      name,
            Arguments: args,
        },
    })
}
 
func (tc *TestClient) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error) {
    return tc.client.ReadResource(ctx, mcp.ReadResourceRequest{
        Params: mcp.ReadResourceRequestParams{
            URI: uri,
        },
    })
}
 
func (tc *TestClient) Close() error {
    return tc.client.Close()
}
 
// Usage in tests
func TestWithInProcessClient(t *testing.T) {
    tc := NewTestClient()
    defer tc.Close()
 
    // Add test tool
    tc.AddTool("echo", "Echo input", func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        input := req.Params.Arguments["input"].(string)
        return mcp.NewToolResultText(input), nil
    })
 
    ctx := context.Background()
    err := tc.Initialize(ctx)
    require.NoError(t, err)
 
    // Test tool call
    result, err := tc.CallTool(ctx, "echo", map[string]interface{}{
        "input": "hello world",
    })
    require.NoError(t, err)
    assert.Equal(t, "hello world", result.Content[0].Text)
}

Transport Selection

Decision Matrix

Choose your transport based on these factors:

type TransportRequirements struct {
    RealTime        bool
    MultiClient     bool
    NetworkRequired bool
    Performance     string // "high", "medium", "low"
    Complexity      string // "low", "medium", "high"
}
 
func SelectTransport(req TransportRequirements) string {
    switch {
    case !req.NetworkRequired && req.Performance == "high":
        return "inprocess"
    
    case !req.NetworkRequired && !req.MultiClient:
        return "stdio"
    
    case req.RealTime && req.MultiClient:
        return "sse"
    
    case req.NetworkRequired && req.MultiClient:
        return "streamablehttp"
    
    default:
        return "stdio" // Default fallback
    }
}
 
// Usage examples
func demonstrateTransportSelection() {
    // High-performance testing
    testReq := TransportRequirements{
        RealTime:        false,
        MultiClient:     false,
        NetworkRequired: false,
        Performance:     "high",
        Complexity:      "low",
    }
    fmt.Printf("Testing: %s\n", SelectTransport(testReq))
 
    // Real-time web application
    webReq := TransportRequirements{
        RealTime:        true,
        MultiClient:     true,
        NetworkRequired: true,
        Performance:     "medium",
        Complexity:      "medium",
    }
    fmt.Printf("Web app: %s\n", SelectTransport(webReq))
 
    // CLI tool
    cliReq := TransportRequirements{
        RealTime:        false,
        MultiClient:     false,
        NetworkRequired: false,
        Performance:     "medium",
        Complexity:      "low",
    }
    fmt.Printf("CLI tool: %s\n", SelectTransport(cliReq))
}

Multi-Transport Client Factory

type ClientFactory struct {
    configs map[string]interface{}
}
 
func NewClientFactory() *ClientFactory {
    return &ClientFactory{
        configs: make(map[string]interface{}),
    }
}
 
func (cf *ClientFactory) SetStdioConfig(command string, args ...string) {
    cf.configs["stdio"] = client.StdioOptions{
        Command: command,
        Args:    args,
    }
}
 
func (cf *ClientFactory) SetStreamableHTTPConfig(baseURL string, headers map[string]string) {
    cf.configs["streamablehttp"] = struct {
        BaseURL string
        Headers map[string]string
    }{
        BaseURL: baseURL,
        Headers: headers,
    }
}
 
func (cf *ClientFactory) SetSSEConfig(baseURL string, headers map[string]string) {
    cf.configs["sse"] = struct {
        BaseURL string
        Headers map[string]string
    }{
        BaseURL: baseURL,
        Headers: headers,
    }
}
 
func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) {
    switch transport {
    case "stdio":
        config, ok := cf.configs["stdio"].(client.StdioOptions)
        if !ok {
            return nil, fmt.Errorf("stdio config not set")
        }
        return client.NewStdioClientWithOptions(config)
 
    case "streamablehttp":
        config, ok := cf.configs["streamablehttp"].(struct {
            BaseURL string
            Headers map[string]string
        })
        if !ok {
            return nil, fmt.Errorf("streamablehttp config not set")
        }
        
        options := []transport.StreamableHTTPCOption{}
        if len(config.Headers) > 0 {
            options = append(options, transport.WithHTTPHeaders(config.Headers))
        }
        
        return client.NewStreamableHttpClient(config.BaseURL, options...), nil
 
    case "sse":
        config, ok := cf.configs["sse"].(struct {
            BaseURL string
            Headers map[string]string
        })
        if !ok {
            return nil, fmt.Errorf("sse config not set")
        }
        
        options := []transport.ClientOption{}
        if len(config.Headers) > 0 {
            options = append(options, transport.WithHeaders(config.Headers))
        }
        
        return client.NewSSEMCPClient(config.BaseURL, options...)
 
    default:
        return nil, fmt.Errorf("unknown transport: %s", transport)
    }
}
 
// Usage
func demonstrateClientFactory() {
    factory := NewClientFactory()
    
    // Configure transports
    factory.SetStdioConfig("go", "run", "server.go")
    factory.SetStreamableHTTPConfig("http://localhost:8080/mcp", map[string]string{
        "Authorization": "Bearer token",
    })
    factory.SetSSEConfig("http://localhost:8080/mcp/sse", map[string]string{
        "Authorization": "Bearer token",
    })
 
    // Create client based on environment
    transport := os.Getenv("MCP_TRANSPORT")
    if transport == "" {
        transport = "stdio"
    }
 
    client, err := factory.CreateClient(transport)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
 
    // Use client...
}