Skip to content

StreamableHTTP Transport

StreamableHTTP transport provides traditional request/response communication for MCP servers, perfect for REST-like interactions, stateless clients, and integration with existing web infrastructure.

Use Cases

StreamableHTTP transport excels in scenarios requiring:

  • Web services: Traditional REST API patterns
  • Stateless interactions: Each request is independent
  • Load balancing: Distribute requests across multiple servers
  • Caching: Leverage HTTP caching mechanisms
  • Integration: Work with existing HTTP infrastructure
  • Public APIs: Expose MCP functionality as web APIs
Example applications:
  • Microservice architectures
  • Public API endpoints
  • Integration with API gateways
  • Cached data services
  • Rate-limited services
  • Multi-tenant applications

Implementation

Basic StreamableHTTP Server

package main
 
import (
    "context"
    "fmt"
    "log"
    "net/http"
    "strings"
    "time"
 
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)
 
func main() {
    s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )
 
    // Add RESTful tools
    s.AddTool(
        mcp.NewTool("get_user",
            mcp.WithDescription("Get user information"),
            mcp.WithString("user_id", mcp.Required()),
        ),
        handleGetUser,
    )
 
    s.AddTool(
        mcp.NewTool("create_user",
            mcp.WithDescription("Create a new user"),
            mcp.WithString("name", mcp.Required()),
            mcp.WithString("email", mcp.Required()),
            mcp.WithNumber("age", mcp.Min(0)),
        ),
        handleCreateUser,
    )
 
    s.AddTool(
        mcp.NewTool("search_users",
            mcp.WithDescription("Search users with filters"),
            mcp.WithString("query", mcp.Description("Search query")),
            mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
            mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
        ),
        handleSearchUsers,
    )
 
    // Add resources
    s.AddResource(
        mcp.NewResource(
            "users://{user_id}",
            "User Profile",
            mcp.WithResourceDescription("User profile data"),
            mcp.WithMIMEType("application/json"),
        ),
        handleUserResource,
    )
 
    // Start StreamableHTTP server
    log.Println("Starting StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s)
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    userID := req.GetString("user_id", "")
    if userID == "" {
        return nil, fmt.Errorf("user_id is required")
    }
    
    // Simulate database lookup
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, 
        user.ID, user.Name, user.Email, user.Age)), nil
}
 
func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    name := req.GetString("name", "")
    email := req.GetString("email", "")
    age := req.GetInt("age", 0)
 
    if name == "" || email == "" {
        return nil, fmt.Errorf("name and email are required")
    }
 
    // Validate input
    if !isValidEmail(email) {
        return nil, fmt.Errorf("invalid email format: %s", email)
    }
 
    // Create user
    user := &User{
        ID:        generateID(),
        Name:      name,
        Email:     email,
        Age:       age,
        CreatedAt: time.Now(),
    }
 
    if err := saveUserToDB(user); err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
        user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}
 
// Helper functions and types for the examples
type User struct {
    ID        string    `json:"id"`
    Name      string    `json:"name"`
    Email     string    `json:"email"`
    Age       int       `json:"age"`
    CreatedAt time.Time `json:"created_at"`
}
 
func getUserFromDB(userID string) (*User, error) {
    // Placeholder implementation
    return &User{
        ID:    userID,
        Name:  "John Doe",
        Email: "john@example.com",
        Age:   30,
    }, nil
}
 
func isValidEmail(email string) bool {
    return strings.Contains(email, "@") && strings.Contains(email, ".")
}
 
func generateID() string {
    // Placeholder implementation
    return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
 
func saveUserToDB(user *User) error {
    // Placeholder implementation
    return nil
}
 
func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    query := req.GetString("query", "")
    limit := req.GetInt("limit", 10)
    offset := req.GetInt("offset", 0)
 
    // Search users with pagination
    users, total, err := searchUsersInDB(query, limit, offset)
    if err != nil {
        return nil, fmt.Errorf("search failed: %w", err)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
        total, limit, offset, query)), nil
}
 
func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    userID := extractUserIDFromURI(req.Params.URI)
    
    user, err := getUserFromDB(userID)
    if err != nil {
        return nil, fmt.Errorf("user not found: %s", userID)
    }
 
    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
        },
    }, nil
}
 
// Additional helper functions
 
func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
    // Placeholder implementation
    users := []*User{
        {ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
        {ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
    }
    return users, len(users), nil
}
 
func extractUserIDFromURI(uri string) string {
    // Extract user ID from URI like "users://123"
    if len(uri) > 8 && uri[:8] == "users://" {
        return uri[8:]
    }
    return uri
}

Advanced StreamableHTTP Configuration

func main() {
    s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )
 
    // Add comprehensive tools and resources
    addCRUDTools(s)
    addBatchTools(s)
    addAnalyticsTools(s)
 
    log.Println("Starting advanced StreamableHTTP server on :8080")
    httpServer := server.NewStreamableHTTPServer(s,
        server.WithEndpointPath("/api/v1/mcp"),
        server.WithHeartbeatInterval(30*time.Second),
        server.WithStateLess(false),
    )
    
    if err := httpServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
// Helper functions for the advanced example
func addCRUDTools(s *server.MCPServer) {
    // Placeholder implementation - would add CRUD tools
}
 
func addBatchTools(s *server.MCPServer) {
    // Placeholder implementation - would add batch processing tools
}
 
func addAnalyticsTools(s *server.MCPServer) {
    // Placeholder implementation - would add analytics tools
}
 
func logToolCall(sessionID, toolName string, duration time.Duration, err error) {
    // Placeholder implementation
    if err != nil {
        log.Printf("Tool %s failed: %v", toolName, err)
    } else {
        log.Printf("Tool %s completed in %v", toolName, duration)
    }
}
 
func logResourceRead(sessionID, uri string, duration time.Duration, err error) {
    // Placeholder implementation
    if err != nil {
        log.Printf("Resource read %s failed: %v", uri, err)
    } else {
        log.Printf("Resource read %s completed in %v", uri, duration)
    }
}
 
func addStreamableHTTPMiddleware(s *server.MCPServer) {
    // Authentication middleware
    s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
        return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            // Extract and validate auth token
            token := extractAuthToken(ctx)
            if token == "" {
                return nil, fmt.Errorf("authentication required")
            }
            
            user, err := validateToken(token)
            if err != nil {
                return nil, fmt.Errorf("invalid token: %w", err)
            }
            
            // Add user to context
            ctx = context.WithValue(ctx, "user", user)
            return next(ctx, req)
        }
    })
 
    // Rate limiting middleware
    s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
        return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
            clientIP := getClientIP(ctx)
            if !rateLimiter.Allow(clientIP) {
                return nil, fmt.Errorf("rate limit exceeded")
            }
            return next(ctx, req)
        }
    })
 
    // Caching middleware
    s.AddResourceMiddleware(func(next server.ResourceHandler) server.ResourceHandler {
        return func(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
            // Check cache first
            if cached := getFromCache(req.Params.URI); cached != nil {
                return cached, nil
            }
            
            result, err := next(ctx, req)
            if err == nil {
                // Cache successful results
                setCache(req.Params.URI, result, 5*time.Minute)
            }
            
            return result, err
        }
    })
}

Endpoints

Standard MCP Endpoints

When you start a StreamableHTTP MCP server, it automatically creates these endpoints:

POST /mcp/initialize     - Initialize MCP session
POST /mcp/tools/list     - List available tools
POST /mcp/tools/call     - Call a tool
POST /mcp/resources/list - List available resources
POST /mcp/resources/read - Read a resource
POST /mcp/prompts/list   - List available prompts
POST /mcp/prompts/get    - Get a prompt
GET  /mcp/health         - Health check
GET  /mcp/capabilities   - Server capabilities

Custom Endpoints

Add custom HTTP endpoints alongside MCP:

func main() {
    s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
    
    // Create HTTP server with custom routes
    mux := http.NewServeMux()
    
    // Add MCP endpoints
    server.AddMCPRoutes(mux, s, "/mcp")
    
    // Add custom endpoints
    mux.HandleFunc("/api/status", handleStatus)
    mux.HandleFunc("/api/metrics", handleMetrics)
    mux.HandleFunc("/api/users", handleUsersAPI)
    mux.HandleFunc("/api/upload", handleFileUpload)
    
    // Add middleware
    handler := addMiddleware(mux)
    
    log.Println("Starting custom StreamableHTTP server on :8080")
    if err := http.ListenAndServe(":8080", handler); err != nil {
        log.Fatal(err)
    }
}
 
func handleStatus(w http.ResponseWriter, r *http.Request) {
    status := map[string]interface{}{
        "status":    "healthy",
        "timestamp": time.Now().Unix(),
        "version":   "1.0.0",
        "uptime":    time.Since(startTime).String(),
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(status)
}
 
func handleMetrics(w http.ResponseWriter, r *http.Request) {
    metrics := collectMetrics()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(metrics)
}
 
func handleUsersAPI(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        handleListUsers(w, r)
    case "POST":
        handleCreateUserAPI(w, r)
    case "PUT":
        handleUpdateUser(w, r)
    case "DELETE":
        handleDeleteUser(w, r)
    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

Request/Response Patterns

Standard MCP Request

POST /mcp/tools/call
Content-Type: application/json
 
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "search_users",
    "arguments": {
      "query": "john",
      "limit": 10,
      "offset": 0
    }
  }
}

Standard MCP Response

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"users\":[...],\"total\":25,\"limit\":10,\"offset\":0}"
      }
    ]
  }
}

Error Response

{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32602,
    "message": "Invalid params",
    "data": {
      "details": "user_id is required"
    }
  }
}

Session Management

Stateful vs Stateless

Stateless Design (Recommended)

// Each request is independent
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Extract all needed information from request
    userID := extractUserFromToken(ctx)
    params := req.Params.Arguments
    
    // Process without relying on server state
    result, err := processRequest(userID, params)
    if err != nil {
        return nil, err
    }
    
    return mcp.NewToolResultJSON(result), nil
}
 
// Use external storage for persistence
func getUserPreferences(userID string) (map[string]interface{}, error) {
    // Load from database, cache, etc.
    return loadFromRedis(fmt.Sprintf("user:%s:prefs", userID))
}

Stateful Design (When Needed)

type HTTPSessionManager struct {
    sessions map[string]*HTTPSession
    mutex    sync.RWMutex
    cleanup  *time.Ticker
}
 
type HTTPSession struct {
    ID          string
    UserID      string
    CreatedAt   time.Time
    LastAccess  time.Time
    Data        map[string]interface{}
    ExpiresAt   time.Time
}
 
func NewHTTPSessionManager() *HTTPSessionManager {
    sm := &HTTPSessionManager{
        sessions: make(map[string]*HTTPSession),
        cleanup:  time.NewTicker(1 * time.Minute),
    }
    
    go sm.cleanupExpiredSessions()
    return sm
}
 
func (sm *HTTPSessionManager) CreateSession(userID string) *HTTPSession {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()
    
    session := &HTTPSession{
        ID:         generateSessionID(),
        UserID:     userID,
        CreatedAt:  time.Now(),
        LastAccess: time.Now(),
        Data:       make(map[string]interface{}),
        ExpiresAt:  time.Now().Add(30 * time.Minute),
    }
    
    sm.sessions[session.ID] = session
    return session
}
 
func (sm *HTTPSessionManager) GetSession(sessionID string) (*HTTPSession, bool) {
    sm.mutex.RLock()
    defer sm.mutex.RUnlock()
    
    session, exists := sm.sessions[sessionID]
    if !exists || time.Now().After(session.ExpiresAt) {
        return nil, false
    }
    
    // Update last access
    session.LastAccess = time.Now()
    session.ExpiresAt = time.Now().Add(30 * time.Minute)
    
    return session, true
}
 
func (sm *HTTPSessionManager) cleanupExpiredSessions() {
    for range sm.cleanup.C {
        sm.mutex.Lock()
        now := time.Now()
        
        for id, session := range sm.sessions {
            if now.After(session.ExpiresAt) {
                delete(sm.sessions, id)
            }
        }
        
        sm.mutex.Unlock()
    }
}

Authentication and Authorization

type AuthMiddleware struct {
    jwtSecret []byte
    userStore UserStore
}
 
func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
    return &AuthMiddleware{
        jwtSecret: secret,
        userStore: store,
    }
}
 
func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Extract token from Authorization header
        authHeader := r.Header.Get("Authorization")
        if !strings.HasPrefix(authHeader, "Bearer ") {
            http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
            return
        }
        
        token := strings.TrimPrefix(authHeader, "Bearer ")
        
        // Validate JWT token
        claims, err := m.validateJWT(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }
        
        // Load user information
        user, err := m.userStore.GetUser(claims.UserID)
        if err != nil {
            http.Error(w, "User not found", http.StatusUnauthorized)
            return
        }
        
        // Add user to request context
        ctx := context.WithValue(r.Context(), "user", user)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}
 
func (m *AuthMiddleware) validateJWT(tokenString string) (*Claims, error) {
    // Note: This example uses a hypothetical JWT library
    // In practice, you would use a real JWT library like github.com/golang-jwt/jwt
    token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
        return m.jwtSecret, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    if claims, ok := token.Claims.(*Claims); ok && token.Valid {
        return claims, nil
    }
    
    return nil, fmt.Errorf("invalid token")
}
 
type Claims struct {
    UserID string `json:"user_id"`
    Role   string `json:"role"`
    jwt.StandardClaims
}

Next Steps