Skip to content

StreamableHTTP Transport

StreamableHTTP transport provides traditional request/response communication for MCP servers, perfect for REST-like interactions, stateless clients, and integration with existing web infrastructure.

Use Cases

StreamableHTTP transport excels in scenarios requiring:

  • Web services: Traditional REST API patterns
  • Stateless interactions: Each request is independent
  • Load balancing: Distribute requests across multiple servers
  • Caching: Leverage HTTP caching mechanisms
  • Integration: Work with existing HTTP infrastructure
  • Public APIs: Expose MCP functionality as web APIs
Example applications:
  • Microservice architectures
  • Public API endpoints
  • Integration with API gateways
  • Cached data services
  • Rate-limited services
  • Multi-tenant applications

Implementation

Basic StreamableHTTP Server

package main
 
import (
    "context"
    "fmt"
    "log"
    "net/http"
    "strings"
    "time"
 
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)
 
func main() {
    s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )
 
    // Add RESTful tools
    s.AddTool(
        mcp.NewTool("get_user",
            mcp.WithDescription("Get user information"),
            mcp.WithString("user_id", mcp.Required()),
        ),
        handleGetUser,
    )
 
    s.AddTool(
        mcp.NewTool("create_user",
            mcp.WithDescription("Create a new user"),
            mcp.WithString("name", mcp.Required()),
            mcp.WithString("email", mcp.Required()),
            mcp.WithNumber("age", mcp.Min(0)),
        ),
        handleCreateUser,
    )
 
    s.AddTool(
        mcp.NewTool("search_users",
            mcp.WithDescription("Search users with filters"),
            mcp.WithString("query", mcp.Description("Search query")),
            mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
            mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
        ),
        handleSearchUsers,
    )
 
    // Add resources
    s.AddResource(
        mcp.NewResource(
            "users://{user_id}",
            "User Profile",
            mcp.WithResourceDescription("User profile data"),
            mcp.WithMIMEType("application/json"),
        ),
        handleUserResource,
    )
 
    // Start StreamableHTTP server
    log.Println("Starting StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s)
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    userID := req.GetString("user_id", "")
    if userID == "" {
        return nil, fmt.Errorf("user_id is required")
    }
    
    // Simulate database lookup
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, 
        user.ID, user.Name, user.Email, user.Age)), nil
}
 
func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    name := req.GetString("name", "")
    email := req.GetString("email", "")
    age := req.GetInt("age", 0)
 
    if name == "" || email == "" {
        return nil, fmt.Errorf("name and email are required")
    }
 
    // Validate input
    if !isValidEmail(email) {
        return nil, fmt.Errorf("invalid email format: %s", email)
    }
 
    // Create user
    user := &User{
        ID:        generateID(),
        Name:      name,
        Email:     email,
        Age:       age,
        CreatedAt: time.Now(),
    }
 
    if err := saveUserToDB(user); err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
        user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}
 
// Helper functions and types for the examples
type User struct {
    ID        string    `json:"id"`
    Name      string    `json:"name"`
    Email     string    `json:"email"`
    Age       int       `json:"age"`
    CreatedAt time.Time `json:"created_at"`
}
 
func getUserFromDB(userID string) (*User, error) {
    // Placeholder implementation
    return &User{
        ID:    userID,
        Name:  "John Doe",
        Email: "john@example.com",
        Age:   30,
    }, nil
}
 
func isValidEmail(email string) bool {
    return strings.Contains(email, "@") && strings.Contains(email, ".")
}
 
func generateID() string {
    // Placeholder implementation
    return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
 
func saveUserToDB(user *User) error {
    // Placeholder implementation
    return nil
}
 
func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    query := req.GetString("query", "")
    limit := req.GetInt("limit", 10)
    offset := req.GetInt("offset", 0)
 
    // Search users with pagination
    users, total, err := searchUsersInDB(query, limit, offset)
    if err != nil {
        return nil, fmt.Errorf("search failed: %w", err)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
        total, limit, offset, query)), nil
}
 
func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    userID := extractUserIDFromURI(req.Params.URI)
    
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }
 
    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
        },
    }, nil
}
 
// Additional helper functions
 
func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
    // Placeholder implementation
    users := []*User{
        {ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
        {ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
    }
    return users, len(users), nil
}
 
func extractUserIDFromURI(uri string) string {
    // Extract user ID from URI like "users://123"
    if len(uri) > 8 && uri[:8] == "users://" {
        return uri[8:]
    }
    return uri
}

Advanced StreamableHTTP Configuration

func main() {
    s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )
 
    // Add comprehensive tools and resources
    addCRUDTools(s)
    addBatchTools(s)
    addAnalyticsTools(s)
 
    log.Println("Starting advanced StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s,
        server.WithEndpointPath("/api/v1/mcp"),
        server.WithHeartbeatInterval(30*time.Second),
        server.WithStateLess(false),
        server.WithSessionIdleTTL(10*time.Minute), // Sweep idle sessions after 10 minutes
    )
    
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
// Helper functions for the advanced example
func addCRUDTools(s *server.MCPServer) {
    // Placeholder implementation - would add CRUD tools
}
 
func addBatchTools(s *server.MCPServer) {
    // Placeholder implementation - would add batch processing tools
}
 
func addAnalyticsTools(s *server.MCPServer) {
    // Placeholder implementation - would add analytics tools
}
 
func logToolCall(sessionID, toolName string, duration time.Duration, err error) {
    // Placeholder implementation
    if err != nil {
        log.Printf("Tool %s failed: %v", toolName, err)
    } else {
        log.Printf("Tool %s completed in %v", toolName, duration)
    }
}
 
func logResourceRead(sessionID, uri string, duration time.Duration, err error) {
    // Placeholder implementation
    if err != nil {
        log.Printf("Resource read %s failed: %v", uri, err)
    } else {
        log.Printf("Resource read %s completed in %v", uri, duration)
    }
}
 
func addStreamableHTTPMiddleware(s *server.MCPServer) {
    // Authentication middleware
    s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
        return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            // Extract and validate auth token
            token := extractAuthToken(ctx)
            if token == "" {
                return nil, fmt.Errorf("authentication required")
            }
            
            user, err := validateToken(token)
            if err != nil {
                return nil, fmt.Errorf("invalid token: %w", err)
            }
            
            // Add user to context
            ctx = context.WithValue(ctx, "user", user)
            return next(ctx, req)
        }
    })
 
    // Rate limiting middleware
    s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
        return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            clientIP := getClientIP(ctx)
            if !rateLimiter.Allow(clientIP) {
                return nil, fmt.Errorf("rate limit exceeded")
            }
            return next(ctx, req)
        }
    })
 
    // Caching middleware
    s.AddResourceMiddleware(func(next server.ResourceHandler) server.ResourceHandler {
        return func(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
            // Check cache first
            if cached := getFromCache(req.Params.URI); cached != nil {
                return cached, nil
            }
            
            result, err := next(ctx, req)
            if err == nil {
                // Cache successful results
                setCache(req.Params.URI, result, 5*time.Minute)
            }
            
            return result, err
        }
    })
}

Endpoints

Standard MCP Endpoints

When you start a StreamableHTTP MCP server, it automatically creates these endpoints:

POST /mcp/initialize     - Initialize MCP session
POST /mcp/tools/list     - List available tools
POST /mcp/tools/call     - Call a tool
POST /mcp/resources/list - List available resources
POST /mcp/resources/read - Read a resource
POST /mcp/prompts/list   - List available prompts
POST /mcp/prompts/get    - Get a prompt
GET  /mcp/health         - Health check
GET  /mcp/capabilities   - Server capabilities

Custom Endpoints

Add custom HTTP endpoints alongside MCP:

func main() {
    s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
    
    // Create HTTP server with custom routes
    mux := http.NewServeMux()
    
    // Add MCP endpoints
    mux.Handle("/mcp", server.NewStreamableHTTPServer(s))
    
    // Add custom endpoints
    mux.HandleFunc("/api/status", handleStatus)
    mux.HandleFunc("/api/metrics", handleMetrics)
    mux.HandleFunc("/api/users", handleUsersAPI)
    mux.HandleFunc("/api/upload", handleFileUpload)
    
    // Add middleware
    handler := addMiddleware(mux)
    
    log.Println("Starting custom StreamableHTTP server on :8080")
    if err := http.ListenAndServe(":8080", handler); err != nil {
        log.Fatal(err)
    }
}
 
func handleStatus(w http.ResponseWriter, r *http.Request) {
    status := map[string]interface{}{
        "status":    "healthy",
        "timestamp": time.Now().Unix(),
        "version":   "1.0.0",
        "uptime":    time.Since(startTime).String(),
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(status)
}
 
func handleMetrics(w http.ResponseWriter, r *http.Request) {
    metrics := collectMetrics()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(metrics)
}
 
func handleUsersAPI(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        handleListUsers(w, r)
    case "POST":
        handleCreateUserAPI(w, r)
    case "PUT":
        handleUpdateUser(w, r)
    case "DELETE":
        handleDeleteUser(w, r)
    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

Request/Response Patterns

Standard MCP Request

POST /mcp/tools/call
Content-Type: application/json
 
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "search_users",
    "arguments": {
      "query": "john",
      "limit": 10,
      "offset": 0
    }
  }
}

Standard MCP Response

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"users\":[...],\"total\":25,\"limit\":10,\"offset\":0}"
      }
    ]
  }
}

Error Response

{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32602,
    "message": "Invalid params",
    "data": {
      "details": "user_id is required"
    }
  }
}

Session Management

Session Idle TTL

When clients disconnect without sending a DELETE request, per-session transport state (tools, resources, log levels, etc.) can leak memory. Use WithSessionIdleTTL to automatically sweep idle sessions:

httpServer := server.NewStreamableHTTPServer(s,
    server.WithSessionIdleTTL(10*time.Minute), // Sweep sessions idle for 10+ minutes
)

The sweeper runs in the background and removes per-session state for sessions that haven't received any requests within the TTL. It also invalidates swept session IDs so they cannot be reused. A zero or negative TTL disables the sweeper (default behavior). The sweeper is automatically stopped on Shutdown().

Stateful vs Stateless

Stateless Design (Recommended)

// Each request is independent
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Extract all needed information from request
    userID := extractUserFromToken(ctx)
    params := req.Params.Arguments
    
    // Process without relying on server state
    result, err := processRequest(userID, params)
    if err != nil {
        return nil, err
    }
    
    return mcp.NewToolResultJSON(result)
}
 
// Use external storage for persistence
func getUserPreferences(userID string) (map[string]interface{}, error) {
    // Load from database, cache, etc.
    return loadFromRedis(fmt.Sprintf("user:%s:prefs", userID))
}

Stateful Design (When Needed)

type HTTPSessionManager struct {
    sessions map[string]*HTTPSession
    mutex    sync.RWMutex
    cleanup  *time.Ticker
}
 
type HTTPSession struct {
    ID          string
    UserID      string
    CreatedAt   time.Time
    LastAccess  time.Time
    Data        map[string]interface{}
    ExpiresAt   time.Time
}
 
func NewHTTPSessionManager() *HTTPSessionManager {
    sm := &HTTPSessionManager{
        sessions: make(map[string]*HTTPSession),
        cleanup:  time.NewTicker(1 * time.Minute),
    }
    
    go sm.cleanupExpiredSessions()
    return sm
}
 
func (sm *HTTPSessionManager) CreateSession(userID string) *HTTPSession {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()
    
    session := &HTTPSession{
        ID:         generateSessionID(),
        UserID:     userID,
        CreatedAt:  time.Now(),
        LastAccess: time.Now(),
        Data:       make(map[string]interface{}),
        ExpiresAt:  time.Now().Add(30 * time.Minute),
    }
    
    sm.sessions[session.ID] = session
    return session
}
 
func (sm *HTTPSessionManager) GetSession(sessionID string) (*HTTPSession, bool) {
    sm.mutex.RLock()
    defer sm.mutex.RUnlock()
    
    session, exists := sm.sessions[sessionID]
    if !exists || time.Now().After(session.ExpiresAt) {
        return nil, false
    }
    
    // Update last access
    session.LastAccess = time.Now()
    session.ExpiresAt = time.Now().Add(30 * time.Minute)
    
    return session, true
}
 
func (sm *HTTPSessionManager) cleanupExpiredSessions() {
    for range sm.cleanup.C {
        sm.mutex.Lock()
        now := time.Now()
        
        for id, session := range sm.sessions {
            if now.After(session.ExpiresAt) {
                delete(sm.sessions, id)
            }
        }
        
        sm.mutex.Unlock()
    }
}

Authentication and Authorization

type AuthMiddleware struct {
    jwtSecret []byte
    userStore UserStore
}
 
func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
    return &AuthMiddleware{
        jwtSecret: secret,
        userStore: store,
    }
}
 
func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Extract token from Authorization header
        authHeader := r.Header.Get("Authorization")
        if !strings.HasPrefix(authHeader, "Bearer ") {
            http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
            return
        }
        
        token := strings.TrimPrefix(authHeader, "Bearer ")
        
        // Validate JWT token
        claims, err := m.validateJWT(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }
        
        // Load user information
        user, err := m.userStore.GetUser(claims.UserID)
        if err != nil {
            http.Error(w, "User not found", http.StatusUnauthorized)
            return
        }
        
        // Add user to request context
        ctx := context.WithValue(r.Context(), "user", user)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}
 
func (m *AuthMiddleware) validateJWT(tokenString string) (*Claims, error) {
    // Note: This example uses a hypothetical JWT library
    // In practice, you would use a real JWT library like github.com/golang-jwt/jwt
    token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
        return m.jwtSecret, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    if claims, ok := token.Claims.(*Claims); ok && token.Valid {
        return claims, nil
    }
    
    return nil, fmt.Errorf("invalid token")
}
 
type Claims struct {
    UserID string `json:"user_id"`
    Role   string `json:"role"`
    jwt.StandardClaims
}

OAuth Protected Resource Metadata (RFC 9728)

When your MCP server requires OAuth, clients need a way to discover which authorization servers, scopes, and bearer methods are supported. The MCP authorization spec points clients at a RFC 9728 /.well-known/oauth-protected-resource endpoint to do exactly that.

mcp-go provides two ways to serve this metadata.

Option A: Auto-mounted via a server option

The simplest approach — the well-known endpoint is registered automatically by Start() and is also dispatched when the server is mounted as an http.Handler via ServeHTTP.

mcpServer := server.NewMCPServer("my-server", "1.0.0")
 
httpServer := server.NewStreamableHTTPServer(mcpServer,
    server.WithProtectedResourceMetadata(server.ProtectedResourceMetadataConfig{
        Resource:               "https://my-mcp-server.com",
        AuthorizationServers:   []string{"https://auth.example.com"},
        ScopesSupported:        []string{"mcp:read", "mcp:write"},
        BearerMethodsSupported: []string{"header"},
        ResourceName:           "My MCP Server",
    }),
)
 
httpServer.Start(":8080")
// GET http://localhost:8080/.well-known/oauth-protected-resource
//   -> 200 application/json with the metadata above

For SSE servers, use the equivalent WithSSEProtectedResourceMetadata:

sseServer := server.NewSSEServer(mcpServer,
    server.WithSSEProtectedResourceMetadata(server.ProtectedResourceMetadataConfig{
        Resource:             "https://my-mcp-server.com",
        AuthorizationServers: []string{"https://auth.example.com"},
    }),
)

Option B: Standalone handler for custom routing

If you build your own http.ServeMux (or use a router like chi / gin), use NewProtectedResourceMetadataHandler directly:

mux := http.NewServeMux()
mux.Handle("/mcp", mcpHandler)
mux.Handle("/.well-known/oauth-protected-resource",
    server.NewProtectedResourceMetadataHandler(server.ProtectedResourceMetadataConfig{
        Resource:             "https://my-mcp-server.com",
        AuthorizationServers: []string{"https://auth.example.com"},
        ScopesSupported:      []string{"mcp:read", "mcp:write"},
    }))
http.ListenAndServe(":8080", mux)

Path-qualified resources (RFC 9728 §3.1)

When your resource identifier has a path (e.g. https://example.com/mcp), the well-known URL must include that path suffix: /.well-known/oauth-protected-resource/mcp. Use server.ProtectedResourceMetadataPath to derive the correct path:

cfg := server.ProtectedResourceMetadataConfig{
    Resource:             "https://example.com/mcp",
    AuthorizationServers: []string{"https://auth.example.com"},
}
mux.Handle(server.ProtectedResourceMetadataPath(cfg.Resource),
    server.NewProtectedResourceMetadataHandler(cfg))
// -> mounts at /.well-known/oauth-protected-resource/mcp

The auto-mount options (WithProtectedResourceMetadata / WithSSEProtectedResourceMetadata) apply this rule automatically.

Handler behavior

MethodResponse
GET200 with application/json body, Cache-Control: no-store
HEAD200 headers, no body
OPTIONS204 (CORS preflight), permissive Access-Control-Allow-* headers
other405 with Allow: GET, HEAD, OPTIONS

CORS is enabled with Access-Control-Allow-Origin: * so browser-based MCP clients can perform discovery cross-origin.

Cross-Origin Resource Sharing (CORS)

The MCP endpoints themselves are not CORS-enabled by default — every deployment has different origin requirements, and emitting permissive headers unconditionally would be a security regression. Browser-based MCP clients hosted on a different origin than the server will be blocked unless you opt in.

Use server.WithStreamableHTTPCORS (or server.WithSSECORS for the SSE transport) to enable CORS handling. The transport then answers preflight (OPTIONS) requests directly and decorates simple cross-origin responses with the appropriate Access-Control-* headers, including a Vary: Origin so downstream caches behave correctly.

httpServer := server.NewStreamableHTTPServer(mcpServer,
    server.WithEndpointPath("/mcp"),
    server.WithStreamableHTTPCORS(
        server.WithCORSAllowedOrigins("https://my-ai-app.com", "http://localhost:3000"),
        server.WithCORSAllowCredentials(),
        server.WithCORSMaxAge(300),
    ),
)

Available CORSOption helpers:

OptionPurpose
WithCORSAllowedOrigins(...)Origins permitted to access the server. "*" allows any origin.
WithCORSAllowedMethods(...)Methods advertised in preflight (defaults to GET, POST, DELETE, OPTIONS).
WithCORSAllowedHeaders(...)Request headers advertised in preflight (defaults to Content-Type, Mcp-Session-Id, Last-Event-ID, Authorization).
WithCORSExposedHeaders(...)Response headers exposed to JavaScript (defaults to Mcp-Session-Id).
WithCORSAllowCredentials()Sends Access-Control-Allow-Credentials: true.
WithCORSMaxAge(seconds)Sets Access-Control-Max-Age for preflight caching.

When WithCORSAllowedOrigins("*") is combined with WithCORSAllowCredentials(), the server echoes the request's Origin header instead of * to remain compliant with the CORS specification.

Request Headers

The StreamableHTTP transport now passes HTTP request headers to MCP handlers. This allows you to access the original HTTP headers that were sent with the request in your tool and resource handlers.

Accessing Headers in Handlers

Headers are available in all MCP request objects:

func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Access request headers
    headers := req.Header
 
    // Use headers for authentication, tracing, etc.
    authToken := headers.Get("Authorization")
    if authToken == "" {
        return nil, fmt.Errorf("authentication required")
    }
    
    // Access other headers
    requestID := headers.Get("X-Request-ID")
    userAgent := headers.Get("User-Agent")
    
    // Rest of your handler code...
}

This works for all MCP request types including:

  • CallToolRequest
  • ReadResourceRequest
  • ListToolsRequest
  • ListResourcesRequest
  • InitializeRequest
  • And other MCP request types

The headers are automatically populated by the transport layer and are available in your handlers without any additional configuration.

Sampling Support

StreamableHTTP transport now supports bidirectional sampling, allowing servers to request LLM completions from clients. This enables advanced scenarios where servers can leverage client-side LLM capabilities.

Requirements for Sampling

To enable sampling with StreamableHTTP transport, the client must use the WithContinuousListening() option:

// Client setup with sampling support
httpTransport, err := transport.NewStreamableHTTP(
    serverURL,
    transport.WithContinuousListening(), // Required for sampling
)
 
// Create client with sampling handler
mcpClient := client.NewClient(httpTransport, 
    client.WithSamplingHandler(samplingHandler))

Without WithContinuousListening(), the client won't maintain a persistent connection to receive sampling requests from the server.

Server-Side Implementation

Enable sampling in your StreamableHTTP server:

mcpServer := server.NewMCPServer("HTTP Sampling Server", "1.0.0")
mcpServer.EnableSampling()
 
// Add a tool that uses sampling
mcpServer.AddTool(mcp.Tool{
    Name:        "ask-llm",
    Description: "Ask the LLM a question",
    InputSchema: mcp.ToolInputSchema{
        Type: "object",
        Properties: map[string]any{
            "question": map[string]any{
                "type":        "string",
                "description": "Question to ask",
            },
        },
        Required: []string{"question"},
    },
}, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    question := mcp.ParseString(req, "question", "")
    
    // Request sampling from client
    samplingRequest := mcp.CreateMessageRequest{
        Request: mcp.Request{
            Method: string(mcp.MethodSamplingCreateMessage),
        },
        CreateMessageParams: mcp.CreateMessageParams{
            Messages: []mcp.SamplingMessage{
                {
                    Role: mcp.RoleUser,
                    Content: mcp.TextContent{
                        Type: "text",
                        Text: question,
                    },
                },
            },
            MaxTokens: 1000,
        },
    }
    
    result, err := mcpServer.RequestSampling(ctx, samplingRequest)
    if err != nil {
        return mcp.NewToolResultError(fmt.Sprintf("Sampling failed: %v", err)), nil
    }
    
    return mcp.NewToolResultText(mcp.GetTextFromContent(result.Content)), nil
})

How It Works

  1. Persistent Connection: When WithContinuousListening() is enabled, the client maintains a persistent SSE connection to the server
  2. Bidirectional Communication: The server can send sampling requests through the SSE stream
  3. Response Channel: The client responds to sampling requests via HTTP POST to the same endpoint
  4. Session Correlation: Responses are correlated using session IDs to ensure they reach the correct handler

Limitations

  • Sampling requires WithContinuousListening() to maintain the SSE connection
  • Without continuous listening, the transport operates in stateless request/response mode only
  • Network interruptions may require reconnection and re-establishment of the sampling channel

Example with Approval Flow

Here's a reference implementation showing proper human-in-the-loop approval:

type ApprovalSamplingHandler struct {
    llmClient LLMClient // Your actual LLM client
    ui        UserInterface // Your UI for presenting requests to users
}
 
func (h *ApprovalSamplingHandler) CreateMessage(ctx context.Context, request mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
    // Step 1: Present the sampling request to the user for review
    approved, modifiedRequest, err := h.ui.PresentSamplingRequest(ctx, request)
    if err != nil {
        return nil, fmt.Errorf("failed to get user approval: %w", err)
    }
    
    if !approved {
        return nil, fmt.Errorf("user rejected sampling request")
    }
    
    // Step 2: Send the approved/modified request to the LLM
    response, err := h.llmClient.CreateCompletion(ctx, modifiedRequest)
    if err != nil {
        return nil, fmt.Errorf("LLM request failed: %w", err)
    }
    
    // Step 3: Present the response to the user for final approval
    approved, modifiedResponse, err := h.ui.PresentSamplingResponse(ctx, response)
    if err != nil {
        return nil, fmt.Errorf("failed to get response approval: %w", err)
    }
    
    if !approved {
        return nil, fmt.Errorf("user rejected sampling response")
    }
    
    // Step 4: Return the approved response to the server
    return modifiedResponse, nil
}
Key Points:
  • Users must explicitly approve both the request (before sending to LLM) and the response (before returning to server)
  • Users can modify prompts or responses before approval
  • Rejection at any stage returns an error to the server
  • The UI should clearly display what the server is requesting and why

Embedding in Non-net/http Frameworks

StreamableHTTPServer exposes two equivalent entry points:

  • ServeHTTP(w http.ResponseWriter, r *http.Request) — the standard net/http handler used by the examples above.
  • Handle(w HTTPResponseWriter, r *HTTPRequest) — a transport-agnostic entry point that lets you embed MCP into HTTP frameworks that do not go through net/http (e.g. fasthttp or fiber) without round-tripping the response through a buffering adaptor.

Both methods drive the same internal state machine; ServeHTTP is implemented as a thin wrapper around Handle.

When to Use Which

SituationUse
Standalone server, mounted in http.ServeMux, chi, gorilla/mux, etc.ServeHTTP (or srv.Start(":8080"))
Mounted in a fiber/echo/gin app via adaptor.HTTPHandler and you don't need streamingServeHTTP
Mounted in fasthttp/fiber and you want native chunked SSE without an adaptor in the pathHandle
Custom transport (test harness, in-memory dispatcher, alternative wire format on top of HTTP semantics)Handle

API Surface

// HTTPRequest is a transport-agnostic view of an incoming MCP HTTP request.
type HTTPRequest struct {
    Method  string          // "POST", "GET", "DELETE"
    URL     *url.URL        // optional; only Path is consulted
    Header  http.Header     // required; plain map[string][]string
    Body    []byte          // already buffered; may be nil for GET/DELETE
    Context context.Context // request context; cancellation is honored
}
 
// HTTPResponseWriter is the minimum surface mcp-go needs to write a
// streamable HTTP response.
type HTTPResponseWriter interface {
    Header() http.Header
    WriteHeader(statusCode int)
    Write(p []byte) (int, error)
    Flush()
    CanStream() bool
}
 
func (s *StreamableHTTPServer) Handle(w HTTPResponseWriter, r *HTTPRequest)

Streaming-Capability Contract

The CanStream() bit on HTTPResponseWriter lets the server gracefully degrade when the underlying transport cannot deliver chunked responses:

  • CanStream() returns trueFlush() MUST forward bytes to the client immediately. POST responses can upgrade to text/event-stream when notifications fire mid-flight, and GET (the SSE listening channel) is accepted.
  • CanStream() returns falseFlush() SHOULD be a no-op. POST responses stay as a single buffered application/json reply (any notifications produced during the request are dropped, since they cannot be delivered without SSE). GET requests are rejected with 405 Method Not Allowed.

Adapter Example: fasthttp

import (
    "net/http"
 
    "github.com/valyala/fasthttp"
    "github.com/mark3labs/mcp-go/server"
)
 
// fasthttpResponseWriter implements server.HTTPResponseWriter on top of a
// *fasthttp.RequestCtx. Streaming is supported via SetBodyStreamWriter.
type fasthttpResponseWriter struct {
    ctx     *fasthttp.RequestCtx
    headers http.Header
    status  int
    written bool
}
 
func newFasthttpResponseWriter(ctx *fasthttp.RequestCtx) *fasthttpResponseWriter {
    return &fasthttpResponseWriter{ctx: ctx, headers: http.Header{}, status: http.StatusOK}
}
 
func (w *fasthttpResponseWriter) Header() http.Header { return w.headers }
 
func (w *fasthttpResponseWriter) WriteHeader(code int) {
    if w.written {
        return
    }
    w.status = code
    for k, vv := range w.headers {
        for _, v := range vv {
            w.ctx.Response.Header.Add(k, v)
        }
    }
    w.ctx.SetStatusCode(code)
    w.written = true
}
 
func (w *fasthttpResponseWriter) Write(p []byte) (int, error) {
    if !w.written {
        w.WriteHeader(http.StatusOK)
    }
    return w.ctx.Write(p)
}
 
func (w *fasthttpResponseWriter) Flush() {
    // fasthttp buffers the response body in-memory by default; flushing has
    // no effect unless you hand the request off to SetBodyStreamWriter.
}
 
// CanStream reports false here because the simple buffered path above does
// not deliver bytes mid-request. Wire it to true if you switch to
// SetBodyStreamWriter for SSE.
func (w *fasthttpResponseWriter) CanStream() bool { return false }
 
// Bridging fasthttp headers to net/http's http.Header.
func fromFasthttpHeader(h *fasthttp.RequestHeader) http.Header {
    out := make(http.Header)
    h.VisitAll(func(key, value []byte) {
        out.Add(string(key), string(value))
    })
    return out
}
 
var mcpServer = server.NewStreamableHTTPServer(
    server.NewMCPServer("my-server", "1.0.0"),
)
 
func mcpHandler(ctx *fasthttp.RequestCtx) {
    w := newFasthttpResponseWriter(ctx)
    r := &server.HTTPRequest{
        Method:  string(ctx.Method()),
        Header:  fromFasthttpHeader(&ctx.Request.Header),
        Body:    ctx.PostBody(),
        Context: ctx,
    }
    mcpServer.Handle(w, r)
}

Behavior Notes

  • WithStreamableHTTPCORS is not applied when entering through Handle. Use the framework's native CORS middleware.
  • WithProtectedResourceMetadata is not applied when entering through Handle. Mount the metadata route separately using NewProtectedResourceMetadataHandler. See OAuth Protected Resource Metadata above.
  • WithHTTPContextFunc is honored. When the request entered through ServeHTTP it receives the original *http.Request; when entered through Handle it receives a synthetic *http.Request derived from the HTTPRequest. Callers can also pre-decorate r.Context directly, which is the recommended pattern for non-net/http integrations.
  • SessionIdManagerResolver still receives a *http.Request for backwards compatibility; the synthetic request carries the headers and URL needed by all built-in resolvers.
  • Backwards compatibility: every existing public API (NewStreamableHTTPServer, ServeHTTP, Start, Shutdown, every option, every session interface) is unchanged.

Next Steps