Client Transports
Learn about transport-specific client implementations and how to choose the right transport for your use case.
Transport Overview
MCP-Go provides client implementations for all supported transports. Each transport has different characteristics and is optimized for specific scenarios.
Transport | Best For | Connection | Real-time | Multi-client |
---|---|---|---|---|
STDIO | CLI tools, desktop apps | Process pipes | No | No |
StreamableHTTP | Web services, APIs | HTTP requests | No | Yes |
SSE | Web apps, real-time | HTTP + EventSource | Yes | Yes |
In-Process | Testing, embedded | Direct calls | Yes | No |
STDIO Client
STDIO clients communicate with servers through standard input/output, typically by spawning a subprocess.
Basic STDIO Client
package main
import (
"context"
"crypto/tls"
"errors"
"fmt"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func createStdioClient() {
// Create client that spawns a subprocess
c, err := client.NewStdioMCPClient(
"go", []string{}, "run", "/path/to/server/main.go",
)
if err != nil {
log.Fatal(err)
}
defer c.Close()
ctx := context.Background()
// Initialize connection
if err := c.Initialize(ctx, initRequest); err != nil {
log.Fatal(err)
}
// Use the client
tools, err := c.ListTools(ctx, listToolsRequest)
if err != nil {
log.Fatal(err)
}
log.Printf("Available tools: %d", len(tools.Tools))
}
STDIO Error Handling
// Define error constants for STDIO client errors
var (
ErrProcessExited = errors.New("process exited")
ErrProcessTimeout = errors.New("process timeout")
ErrBrokenPipe = errors.New("broken pipe")
)
func handleStdioErrors(c *client.StdioClient) {
ctx := context.Background()
result, err := c.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolRequestParams{
Name: "example_tool",
},
})
if err != nil {
switch {
case errors.Is(err, ErrProcessExited):
log.Println("Server process exited unexpectedly")
// Attempt to restart
if restartErr := c.Restart(); restartErr != nil {
log.Printf("Failed to restart: %v", restartErr)
}
case errors.Is(err, ErrProcessTimeout):
log.Println("Server process timed out")
// Kill and restart process
c.Kill()
if restartErr := c.Restart(); restartErr != nil {
log.Printf("Failed to restart: %v", restartErr)
}
case errors.Is(err, ErrBrokenPipe):
log.Println("Communication pipe broken")
// Process likely crashed, restart
if restartErr := c.Restart(); restartErr != nil {
log.Printf("Failed to restart: %v", restartErr)
}
default:
log.Printf("Unexpected error: %v", err)
}
return
}
log.Printf("Tool result: %+v", result)
}
STDIO Process Management
type ManagedStdioClient struct {
client *client.StdioClient
options client.StdioOptions
restartChan chan struct{}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewManagedStdioClient(options client.StdioOptions) (*ManagedStdioClient, error) {
ctx, cancel := context.WithCancel(context.Background())
msc := &ManagedStdioClient{
options: options,
restartChan: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
}
if err := msc.start(); err != nil {
cancel()
return nil, err
}
msc.wg.Add(1)
go msc.monitorProcess()
return msc, nil
}
func (msc *ManagedStdioClient) start() error {
client, err := client.NewStdioClientWithOptions(msc.options)
if err != nil {
return err
}
if err := client.Initialize(msc.ctx); err != nil {
client.Close()
return err
}
msc.client = client
return nil
}
func (msc *ManagedStdioClient) monitorProcess() {
defer msc.wg.Done()
for {
select {
case <-msc.ctx.Done():
return
case <-msc.restartChan:
log.Println("Restarting STDIO client...")
if msc.client != nil {
msc.client.Close()
}
// Wait before restarting
time.Sleep(1 * time.Second)
if err := msc.start(); err != nil {
log.Printf("Failed to restart client: %v", err)
// Try again after delay
time.Sleep(5 * time.Second)
select {
case msc.restartChan <- struct{}{}:
default:
}
} else {
log.Println("Client restarted successfully")
}
}
}
}
func (msc *ManagedStdioClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
if msc.client == nil {
return nil, fmt.Errorf("client not available")
}
result, err := msc.client.CallTool(ctx, req)
if err != nil && isProcessError(err) {
// Trigger restart
select {
case msc.restartChan <- struct{}{}:
default:
}
return nil, fmt.Errorf("process error, restarting: %w", err)
}
return result, err
}
func (msc *ManagedStdioClient) Close() error {
msc.cancel()
msc.wg.Wait()
if msc.client != nil {
return msc.client.Close()
}
return nil
}
func isProcessError(err error) bool {
return errors.Is(err, ErrProcessExited) ||
errors.Is(err, ErrBrokenPipe) ||
errors.Is(err, ErrProcessTimeout)
}
// Define connection error constants
var (
ErrConnectionLost = errors.New("connection lost")
ErrConnectionFailed = errors.New("connection failed")
ErrUnauthorized = errors.New("unauthorized")
ErrForbidden = errors.New("forbidden")
)
StreamableHTTP Client
StreamableHTTP clients communicate with servers using traditional HTTP requests.
Basic StreamableHTTP Client
func createStreamableHTTPClient() {
// Create StreamableHTTP client
c := client.NewStreamableHttpClient("http://localhost:8080/mcp")
defer c.Close()
ctx := context.Background()
// Initialize
if err := c.Initialize(ctx); err != nil {
log.Fatal(err)
}
// Use client
tools, err := c.ListTools(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Available tools: %d", len(tools.Tools))
}
StreamableHTTP Client with Custom Configuration
func createCustomStreamableHTTPClient() {
// Create StreamableHTTP client with options
c := client.NewStreamableHttpClient("https://api.example.com/mcp",
transport.WithHTTPTimeout(30*time.Second),
transport.WithHTTPHeaders(map[string]string{
"User-Agent": "MyApp/1.0",
"Accept": "application/json",
}),
transport.WithHTTPBasicClient(&http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: false,
},
},
}),
)
defer c.Close()
ctx := context.Background()
if err := c.Initialize(ctx); err != nil {
log.Fatal(err)
}
// Use client...
}
StreamableHTTP Authentication
func createAuthenticatedStreamableHTTPClient() {
// Create StreamableHTTP client with OAuth
c := client.NewStreamableHttpClient("http://localhost:8080/mcp",
transport.WithHTTPOAuth(transport.OAuthConfig{
ClientID: "your-client-id",
ClientSecret: "your-client-secret",
TokenURL: "https://auth.example.com/token",
Scopes: []string{"mcp:read", "mcp:write"},
}),
)
defer c.Close()
ctx := context.Background()
if err := c.Initialize(ctx); err != nil {
log.Fatal(err)
}
// Use client...
}
func isAuthError(err error) bool {
return errors.Is(err, ErrUnauthorized) ||
errors.Is(err, ErrForbidden)
}
StreamableHTTP Connection Pooling
type StreamableHTTPClientPool struct {
clients chan *client.Client
factory func() *client.Client
maxSize int
}
func NewStreamableHTTPClientPool(baseURL string, maxSize int) *StreamableHTTPClientPool {
pool := &StreamableHTTPClientPool{
clients: make(chan *client.Client, maxSize),
maxSize: maxSize,
factory: func() *client.Client {
return client.NewStreamableHttpClient(baseURL)
},
}
// Pre-populate pool
for i := 0; i < maxSize; i++ {
pool.clients <- pool.factory()
}
return pool
}
func (pool *StreamableHTTPClientPool) Get() *client.Client {
select {
case c := <-pool.clients:
return c
default:
return pool.factory()
}
}
func (pool *StreamableHTTPClientPool) Put(c *client.Client) {
select {
case pool.clients <- c:
default:
// Pool full, close client
c.Close()
}
}
func (pool *StreamableHTTPClientPool) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
c := pool.Get()
defer pool.Put(c)
return c.CallTool(ctx, req)
}
SSE Client
SSE (Server-Sent Events) clients provide real-time communication with servers.
Basic SSE Client
func createSSEClient() {
// Create SSE client
c := client.NewSSEClient("http://localhost:8080/mcp/sse")
defer c.Close()
// Set authentication
c.SetHeader("Authorization", "Bearer your-token")
ctx := context.Background()
// Initialize
if err := c.Initialize(ctx); err != nil {
log.Fatal(err)
}
// Subscribe to notifications
notifications, err := c.Subscribe(ctx)
if err != nil {
log.Fatal(err)
}
// Handle notifications in background
go func() {
for notification := range notifications {
log.Printf("Notification: %+v", notification)
}
}()
// Use client for regular operations
tools, err := c.ListTools(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Available tools: %d", len(tools.Tools))
}
SSE Client with Reconnection
type ResilientSSEClient struct {
baseURL string
headers map[string]string
client *client.SSEClient
ctx context.Context
cancel context.CancelFunc
reconnectCh chan struct{}
mutex sync.RWMutex
}
func NewResilientSSEClient(baseURL string) *ResilientSSEClient {
ctx, cancel := context.WithCancel(context.Background())
rsc := &ResilientSSEClient{
baseURL: baseURL,
headers: make(map[string]string),
ctx: ctx,
cancel: cancel,
reconnectCh: make(chan struct{}, 1),
}
go rsc.reconnectLoop()
return rsc
}
func (rsc *ResilientSSEClient) SetHeader(key, value string) {
rsc.mutex.Lock()
defer rsc.mutex.Unlock()
rsc.headers[key] = value
}
func (rsc *ResilientSSEClient) connect() error {
rsc.mutex.Lock()
defer rsc.mutex.Unlock()
if rsc.client != nil {
rsc.client.Close()
}
client := client.NewSSEClient(rsc.baseURL)
// Set headers
for key, value := range rsc.headers {
client.SetHeader(key, value)
}
if err := client.Initialize(rsc.ctx); err != nil {
return err
}
rsc.client = client
return nil
}
func (rsc *ResilientSSEClient) reconnectLoop() {
for {
select {
case <-rsc.ctx.Done():
return
case <-rsc.reconnectCh:
log.Println("Reconnecting SSE client...")
for attempt := 1; attempt <= 5; attempt++ {
if err := rsc.connect(); err != nil {
log.Printf("Reconnection attempt %d failed: %v", attempt, err)
backoff := time.Duration(attempt) * time.Second
select {
case <-time.After(backoff):
case <-rsc.ctx.Done():
return
}
} else {
log.Println("Reconnected successfully")
break
}
}
}
}
}
func (rsc *ResilientSSEClient) CallTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
rsc.mutex.RLock()
client := rsc.client
rsc.mutex.RUnlock()
if client == nil {
return nil, fmt.Errorf("client not connected")
}
result, err := client.CallTool(ctx, req)
if err != nil && isConnectionError(err) {
// Trigger reconnection
select {
case rsc.reconnectCh <- struct{}{}:
default:
}
return nil, fmt.Errorf("connection error: %w", err)
}
return result, err
}
func (rsc *ResilientSSEClient) Subscribe(ctx context.Context) (<-chan mcp.Notification, error) {
rsc.mutex.RLock()
client := rsc.client
rsc.mutex.RUnlock()
if client == nil {
return nil, fmt.Errorf("client not connected")
}
return client.Subscribe(ctx)
}
func (rsc *ResilientSSEClient) Close() error {
rsc.cancel()
rsc.mutex.Lock()
defer rsc.mutex.Unlock()
if rsc.client != nil {
return rsc.client.Close()
}
return nil
}
// Helper function to check if an error is a connection error
func isConnectionError(err error) bool {
return errors.Is(err, ErrConnectionLost) ||
errors.Is(err, ErrConnectionFailed)
}
SSE Event Handling
type SSEEventHandler struct {
client *client.SSEClient
handlers map[string][]func(mcp.Notification)
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewSSEEventHandler(c *client.SSEClient) *SSEEventHandler {
ctx, cancel := context.WithCancel(context.Background())
return &SSEEventHandler{
client: c,
handlers: make(map[string][]func(mcp.Notification)),
ctx: ctx,
cancel: cancel,
}
}
func (seh *SSEEventHandler) Start() error {
notifications, err := seh.client.Subscribe(seh.ctx)
if err != nil {
return err
}
seh.wg.Add(1)
go func() {
defer seh.wg.Done()
for {
select {
case notification := <-notifications:
seh.handleNotification(notification)
case <-seh.ctx.Done():
return
}
}
}()
return nil
}
func (seh *SSEEventHandler) Stop() {
seh.cancel()
seh.wg.Wait()
}
func (seh *SSEEventHandler) OnProgress(handler func(mcp.Notification)) {
seh.addHandler("notifications/progress", handler)
}
func (seh *SSEEventHandler) OnMessage(handler func(mcp.Notification)) {
seh.addHandler("notifications/message", handler)
}
func (seh *SSEEventHandler) OnResourceUpdate(handler func(mcp.Notification)) {
seh.addHandler("notifications/resources/updated", handler)
}
func (seh *SSEEventHandler) OnToolUpdate(handler func(mcp.Notification)) {
seh.addHandler("notifications/tools/updated", handler)
}
func (seh *SSEEventHandler) addHandler(method string, handler func(mcp.Notification)) {
seh.mutex.Lock()
defer seh.mutex.Unlock()
seh.handlers[method] = append(seh.handlers[method], handler)
}
func (seh *SSEEventHandler) handleNotification(notification mcp.Notification) {
seh.mutex.RLock()
handlers := seh.handlers[notification.Method]
seh.mutex.RUnlock()
for _, handler := range handlers {
go handler(notification)
}
}
In-Process Client
In-process clients provide direct communication with servers in the same process.
Basic In-Process Client
func createInProcessClient() {
// Create server
s := server.NewMCPServer("Test Server", "1.0.0")
// Add tools to server
s.AddTool(
mcp.NewTool("test_tool",
mcp.WithDescription("Test tool"),
mcp.WithString("input", mcp.Required()),
),
func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
input := req.Params.Arguments["input"].(string)
return mcp.NewToolResultText("Processed: " + input), nil
},
)
// Create in-process client
c := client.NewInProcessClient(s)
defer c.Close()
ctx := context.Background()
// Initialize (no network overhead)
if err := c.Initialize(ctx); err != nil {
log.Fatal(err)
}
// Use client
result, err := c.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolRequestParams{
Name: "test_tool",
Arguments: map[string]interface{}{
"input": "test data",
},
},
})
if err != nil {
log.Fatal(err)
}
log.Printf("Tool result: %+v", result)
}
In-Process Client for Testing
type TestClient struct {
server *server.MCPServer
client *client.InProcessClient
}
func NewTestClient() *TestClient {
s := server.NewMCPServer("Test Server", "1.0.0",
server.WithAllCapabilities(),
)
return &TestClient{
server: s,
client: client.NewInProcessClient(s),
}
}
func (tc *TestClient) AddTool(name, description string, handler server.ToolHandler) {
tool := mcp.NewTool(name, mcp.WithDescription(description))
tc.server.AddTool(tool, handler)
}
func (tc *TestClient) AddResource(uri, name string, handler server.ResourceHandler) {
resource := mcp.NewResource(uri, name)
tc.server.AddResource(resource, handler)
}
func (tc *TestClient) Initialize(ctx context.Context) error {
return tc.client.Initialize(ctx)
}
func (tc *TestClient) CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error) {
return tc.client.CallTool(ctx, mcp.CallToolRequest{
Params: mcp.CallToolRequestParams{
Name: name,
Arguments: args,
},
})
}
func (tc *TestClient) ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error) {
return tc.client.ReadResource(ctx, mcp.ReadResourceRequest{
Params: mcp.ReadResourceRequestParams{
URI: uri,
},
})
}
func (tc *TestClient) Close() error {
return tc.client.Close()
}
// Usage in tests
func TestWithInProcessClient(t *testing.T) {
tc := NewTestClient()
defer tc.Close()
// Add test tool
tc.AddTool("echo", "Echo input", func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
input := req.Params.Arguments["input"].(string)
return mcp.NewToolResultText(input), nil
})
ctx := context.Background()
err := tc.Initialize(ctx)
require.NoError(t, err)
// Test tool call
result, err := tc.CallTool(ctx, "echo", map[string]interface{}{
"input": "hello world",
})
require.NoError(t, err)
assert.Equal(t, "hello world", result.Content[0].Text)
}
Transport Selection
Decision Matrix
Choose your transport based on these factors:
type TransportRequirements struct {
RealTime bool
MultiClient bool
NetworkRequired bool
Performance string // "high", "medium", "low"
Complexity string // "low", "medium", "high"
}
func SelectTransport(req TransportRequirements) string {
switch {
case !req.NetworkRequired && req.Performance == "high":
return "inprocess"
case !req.NetworkRequired && !req.MultiClient:
return "stdio"
case req.RealTime && req.MultiClient:
return "sse"
case req.NetworkRequired && req.MultiClient:
return "streamablehttp"
default:
return "stdio" // Default fallback
}
}
// Usage examples
func demonstrateTransportSelection() {
// High-performance testing
testReq := TransportRequirements{
RealTime: false,
MultiClient: false,
NetworkRequired: false,
Performance: "high",
Complexity: "low",
}
fmt.Printf("Testing: %s\n", SelectTransport(testReq))
// Real-time web application
webReq := TransportRequirements{
RealTime: true,
MultiClient: true,
NetworkRequired: true,
Performance: "medium",
Complexity: "medium",
}
fmt.Printf("Web app: %s\n", SelectTransport(webReq))
// CLI tool
cliReq := TransportRequirements{
RealTime: false,
MultiClient: false,
NetworkRequired: false,
Performance: "medium",
Complexity: "low",
}
fmt.Printf("CLI tool: %s\n", SelectTransport(cliReq))
}
Multi-Transport Client Factory
type ClientFactory struct {
configs map[string]interface{}
}
func NewClientFactory() *ClientFactory {
return &ClientFactory{
configs: make(map[string]interface{}),
}
}
func (cf *ClientFactory) SetStdioConfig(command string, args ...string) {
cf.configs["stdio"] = client.StdioOptions{
Command: command,
Args: args,
}
}
func (cf *ClientFactory) SetStreamableHTTPConfig(baseURL string, headers map[string]string) {
cf.configs["streamablehttp"] = struct {
BaseURL string
Headers map[string]string
}{
BaseURL: baseURL,
Headers: headers,
}
}
func (cf *ClientFactory) SetSSEConfig(baseURL string, headers map[string]string) {
cf.configs["sse"] = struct {
BaseURL string
Headers map[string]string
}{
BaseURL: baseURL,
Headers: headers,
}
}
func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) {
switch transport {
case "stdio":
config, ok := cf.configs["stdio"].(client.StdioOptions)
if !ok {
return nil, fmt.Errorf("stdio config not set")
}
return client.NewStdioClientWithOptions(config)
case "streamablehttp":
config, ok := cf.configs["streamablehttp"].(struct {
BaseURL string
Headers map[string]string
})
if !ok {
return nil, fmt.Errorf("streamablehttp config not set")
}
options := []transport.StreamableHTTPCOption{}
if len(config.Headers) > 0 {
options = append(options, transport.WithHTTPHeaders(config.Headers))
}
return client.NewStreamableHttpClient(config.BaseURL, options...), nil
case "sse":
config, ok := cf.configs["sse"].(struct {
BaseURL string
Headers map[string]string
})
if !ok {
return nil, fmt.Errorf("sse config not set")
}
options := []transport.ClientOption{}
if len(config.Headers) > 0 {
options = append(options, transport.WithHeaders(config.Headers))
}
return client.NewSSEMCPClient(config.BaseURL, options...)
default:
return nil, fmt.Errorf("unknown transport: %s", transport)
}
}
// Usage
func demonstrateClientFactory() {
factory := NewClientFactory()
// Configure transports
factory.SetStdioConfig("go", "run", "server.go")
factory.SetStreamableHTTPConfig("http://localhost:8080/mcp", map[string]string{
"Authorization": "Bearer token",
})
factory.SetSSEConfig("http://localhost:8080/mcp/sse", map[string]string{
"Authorization": "Bearer token",
})
// Create client based on environment
transport := os.Getenv("MCP_TRANSPORT")
if transport == "" {
transport = "stdio"
}
client, err := factory.CreateClient(transport)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Use client...
}