StreamableHTTP Transport
StreamableHTTP transport provides traditional request/response communication for MCP servers, perfect for REST-like interactions, stateless clients, and integration with existing web infrastructure.
Use Cases
StreamableHTTP transport excels in scenarios requiring:
- Web services: Traditional REST API patterns
- Stateless interactions: Each request is independent
- Load balancing: Distribute requests across multiple servers
- Caching: Leverage HTTP caching mechanisms
- Integration: Work with existing HTTP infrastructure
- Public APIs: Expose MCP functionality as web APIs
- Microservice architectures
- Public API endpoints
- Integration with API gateways
- Cached data services
- Rate-limited services
- Multi-tenant applications
Implementation
Basic StreamableHTTP Server
package main
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// Add RESTful tools
s.AddTool(
mcp.NewTool("get_user",
mcp.WithDescription("Get user information"),
mcp.WithString("user_id", mcp.Required()),
),
handleGetUser,
)
s.AddTool(
mcp.NewTool("create_user",
mcp.WithDescription("Create a new user"),
mcp.WithString("name", mcp.Required()),
mcp.WithString("email", mcp.Required()),
mcp.WithNumber("age", mcp.Min(0)),
),
handleCreateUser,
)
s.AddTool(
mcp.NewTool("search_users",
mcp.WithDescription("Search users with filters"),
mcp.WithString("query", mcp.Description("Search query")),
mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
),
handleSearchUsers,
)
// Add resources
s.AddResource(
mcp.NewResource(
"users://{user_id}",
"User Profile",
mcp.WithResourceDescription("User profile data"),
mcp.WithMIMEType("application/json"),
),
handleUserResource,
)
// Start StreamableHTTP server
log.Println("Starting StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
userID := req.GetString("user_id", "")
if userID == "" {
return nil, fmt.Errorf("user_id is required")
}
// Simulate database lookup
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`,
user.ID, user.Name, user.Email, user.Age)), nil
}
func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
name := req.GetString("name", "")
email := req.GetString("email", "")
age := req.GetInt("age", 0)
if name == "" || email == "" {
return nil, fmt.Errorf("name and email are required")
}
// Validate input
if !isValidEmail(email) {
return nil, fmt.Errorf("invalid email format: %s", email)
}
// Create user
user := &User{
ID: generateID(),
Name: name,
Email: email,
Age: age,
CreatedAt: time.Now(),
}
if err := saveUserToDB(user); err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}
// Helper functions and types for the examples
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
CreatedAt time.Time `json:"created_at"`
}
func getUserFromDB(userID string) (*User, error) {
// Placeholder implementation
return &User{
ID: userID,
Name: "John Doe",
Email: "john@example.com",
Age: 30,
}, nil
}
func isValidEmail(email string) bool {
return strings.Contains(email, "@") && strings.Contains(email, ".")
}
func generateID() string {
// Placeholder implementation
return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
func saveUserToDB(user *User) error {
// Placeholder implementation
return nil
}
func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
query := req.GetString("query", "")
limit := req.GetInt("limit", 10)
offset := req.GetInt("offset", 0)
// Search users with pagination
users, total, err := searchUsersInDB(query, limit, offset)
if err != nil {
return nil, fmt.Errorf("search failed: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
total, limit, offset, query)), nil
}
func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
userID := extractUserIDFromURI(req.Params.URI)
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
},
}, nil
}
// Additional helper functions
func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
// Placeholder implementation
users := []*User{
{ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
{ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
}
return users, len(users), nil
}
func extractUserIDFromURI(uri string) string {
// Extract user ID from URI like "users://123"
if len(uri) > 8 && uri[:8] == "users://" {
return uri[8:]
}
return uri
}
Advanced StreamableHTTP Configuration
func main() {
s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// Add comprehensive tools and resources
addCRUDTools(s)
addBatchTools(s)
addAnalyticsTools(s)
log.Println("Starting advanced StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s,
server.WithEndpointPath("/api/v1/mcp"),
server.WithHeartbeatInterval(30*time.Second),
server.WithStateLess(false),
)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
// Helper functions for the advanced example
func addCRUDTools(s *server.MCPServer) {
// Placeholder implementation - would add CRUD tools
}
func addBatchTools(s *server.MCPServer) {
// Placeholder implementation - would add batch processing tools
}
func addAnalyticsTools(s *server.MCPServer) {
// Placeholder implementation - would add analytics tools
}
func logToolCall(sessionID, toolName string, duration time.Duration, err error) {
// Placeholder implementation
if err != nil {
log.Printf("Tool %s failed: %v", toolName, err)
} else {
log.Printf("Tool %s completed in %v", toolName, duration)
}
}
func logResourceRead(sessionID, uri string, duration time.Duration, err error) {
// Placeholder implementation
if err != nil {
log.Printf("Resource read %s failed: %v", uri, err)
} else {
log.Printf("Resource read %s completed in %v", uri, duration)
}
}
func addStreamableHTTPMiddleware(s *server.MCPServer) {
// Authentication middleware
s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Extract and validate auth token
token := extractAuthToken(ctx)
if token == "" {
return nil, fmt.Errorf("authentication required")
}
user, err := validateToken(token)
if err != nil {
return nil, fmt.Errorf("invalid token: %w", err)
}
// Add user to context
ctx = context.WithValue(ctx, "user", user)
return next(ctx, req)
}
})
// Rate limiting middleware
s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
clientIP := getClientIP(ctx)
if !rateLimiter.Allow(clientIP) {
return nil, fmt.Errorf("rate limit exceeded")
}
return next(ctx, req)
}
})
// Caching middleware
s.AddResourceMiddleware(func(next server.ResourceHandler) server.ResourceHandler {
return func(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
// Check cache first
if cached := getFromCache(req.Params.URI); cached != nil {
return cached, nil
}
result, err := next(ctx, req)
if err == nil {
// Cache successful results
setCache(req.Params.URI, result, 5*time.Minute)
}
return result, err
}
})
}
Endpoints
Standard MCP Endpoints
When you start a StreamableHTTP MCP server, it automatically creates these endpoints:
POST /mcp/initialize - Initialize MCP session
POST /mcp/tools/list - List available tools
POST /mcp/tools/call - Call a tool
POST /mcp/resources/list - List available resources
POST /mcp/resources/read - Read a resource
POST /mcp/prompts/list - List available prompts
POST /mcp/prompts/get - Get a prompt
GET /mcp/health - Health check
GET /mcp/capabilities - Server capabilities
Custom Endpoints
Add custom HTTP endpoints alongside MCP:
func main() {
s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
// Create HTTP server with custom routes
mux := http.NewServeMux()
// Add MCP endpoints
server.AddMCPRoutes(mux, s, "/mcp")
// Add custom endpoints
mux.HandleFunc("/api/status", handleStatus)
mux.HandleFunc("/api/metrics", handleMetrics)
mux.HandleFunc("/api/users", handleUsersAPI)
mux.HandleFunc("/api/upload", handleFileUpload)
// Add middleware
handler := addMiddleware(mux)
log.Println("Starting custom StreamableHTTP server on :8080")
if err := http.ListenAndServe(":8080", handler); err != nil {
log.Fatal(err)
}
}
func handleStatus(w http.ResponseWriter, r *http.Request) {
status := map[string]interface{}{
"status": "healthy",
"timestamp": time.Now().Unix(),
"version": "1.0.0",
"uptime": time.Since(startTime).String(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}
func handleMetrics(w http.ResponseWriter, r *http.Request) {
metrics := collectMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
}
func handleUsersAPI(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
handleListUsers(w, r)
case "POST":
handleCreateUserAPI(w, r)
case "PUT":
handleUpdateUser(w, r)
case "DELETE":
handleDeleteUser(w, r)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
Request/Response Patterns
Standard MCP Request
POST /mcp/tools/call
Content-Type: application/json
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "search_users",
"arguments": {
"query": "john",
"limit": 10,
"offset": 0
}
}
}
Standard MCP Response
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "{\"users\":[...],\"total\":25,\"limit\":10,\"offset\":0}"
}
]
}
}
Error Response
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid params",
"data": {
"details": "user_id is required"
}
}
}
Session Management
Stateful vs Stateless
Stateless Design (Recommended)
// Each request is independent
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Extract all needed information from request
userID := extractUserFromToken(ctx)
params := req.Params.Arguments
// Process without relying on server state
result, err := processRequest(userID, params)
if err != nil {
return nil, err
}
return mcp.NewToolResultJSON(result), nil
}
// Use external storage for persistence
func getUserPreferences(userID string) (map[string]interface{}, error) {
// Load from database, cache, etc.
return loadFromRedis(fmt.Sprintf("user:%s:prefs", userID))
}
Stateful Design (When Needed)
type HTTPSessionManager struct {
sessions map[string]*HTTPSession
mutex sync.RWMutex
cleanup *time.Ticker
}
type HTTPSession struct {
ID string
UserID string
CreatedAt time.Time
LastAccess time.Time
Data map[string]interface{}
ExpiresAt time.Time
}
func NewHTTPSessionManager() *HTTPSessionManager {
sm := &HTTPSessionManager{
sessions: make(map[string]*HTTPSession),
cleanup: time.NewTicker(1 * time.Minute),
}
go sm.cleanupExpiredSessions()
return sm
}
func (sm *HTTPSessionManager) CreateSession(userID string) *HTTPSession {
sm.mutex.Lock()
defer sm.mutex.Unlock()
session := &HTTPSession{
ID: generateSessionID(),
UserID: userID,
CreatedAt: time.Now(),
LastAccess: time.Now(),
Data: make(map[string]interface{}),
ExpiresAt: time.Now().Add(30 * time.Minute),
}
sm.sessions[session.ID] = session
return session
}
func (sm *HTTPSessionManager) GetSession(sessionID string) (*HTTPSession, bool) {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
session, exists := sm.sessions[sessionID]
if !exists || time.Now().After(session.ExpiresAt) {
return nil, false
}
// Update last access
session.LastAccess = time.Now()
session.ExpiresAt = time.Now().Add(30 * time.Minute)
return session, true
}
func (sm *HTTPSessionManager) cleanupExpiredSessions() {
for range sm.cleanup.C {
sm.mutex.Lock()
now := time.Now()
for id, session := range sm.sessions {
if now.After(session.ExpiresAt) {
delete(sm.sessions, id)
}
}
sm.mutex.Unlock()
}
}
Authentication and Authorization
type AuthMiddleware struct {
jwtSecret []byte
userStore UserStore
}
func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
return &AuthMiddleware{
jwtSecret: secret,
userStore: store,
}
}
func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract token from Authorization header
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
return
}
token := strings.TrimPrefix(authHeader, "Bearer ")
// Validate JWT token
claims, err := m.validateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// Load user information
user, err := m.userStore.GetUser(claims.UserID)
if err != nil {
http.Error(w, "User not found", http.StatusUnauthorized)
return
}
// Add user to request context
ctx := context.WithValue(r.Context(), "user", user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func (m *AuthMiddleware) validateJWT(tokenString string) (*Claims, error) {
// Note: This example uses a hypothetical JWT library
// In practice, you would use a real JWT library like github.com/golang-jwt/jwt
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return m.jwtSecret, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
type Claims struct {
UserID string `json:"user_id"`
Role string `json:"role"`
jwt.StandardClaims
}
Request Headers
The StreamableHTTP transport now passes HTTP request headers to MCP handlers. This allows you to access the original HTTP headers that were sent with the request in your tool and resource handlers.
Accessing Headers in Handlers
Headers are available in all MCP request objects:
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Access request headers
headers := req.Header
// Use headers for authentication, tracing, etc.
authToken := headers.Get("Authorization")
if authToken == "" {
return nil, fmt.Errorf("authentication required")
}
// Access other headers
requestID := headers.Get("X-Request-ID")
userAgent := headers.Get("User-Agent")
// Rest of your handler code...
}
This works for all MCP request types including:
CallToolRequest
ReadResourceRequest
ListToolsRequest
ListResourcesRequest
InitializeRequest
- And other MCP request types
The headers are automatically populated by the transport layer and are available in your handlers without any additional configuration.
Next Steps
- In-Process Transport - Learn about embedded scenarios
- Client Development - Build MCP clients for HTTP transport
- Server Basics - Review fundamental server concepts