StreamableHTTP Transport
StreamableHTTP transport provides traditional request/response communication for MCP servers, perfect for REST-like interactions, stateless clients, and integration with existing web infrastructure.
Use Cases
StreamableHTTP transport excels in scenarios requiring:
- Web services: Traditional REST API patterns
- Stateless interactions: Each request is independent
- Load balancing: Distribute requests across multiple servers
- Caching: Leverage HTTP caching mechanisms
- Integration: Work with existing HTTP infrastructure
- Public APIs: Expose MCP functionality as web APIs
- Microservice architectures
- Public API endpoints
- Integration with API gateways
- Cached data services
- Rate-limited services
- Multi-tenant applications
Implementation
Basic StreamableHTTP Server
package main
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// Add RESTful tools
s.AddTool(
mcp.NewTool("get_user",
mcp.WithDescription("Get user information"),
mcp.WithString("user_id", mcp.Required()),
),
handleGetUser,
)
s.AddTool(
mcp.NewTool("create_user",
mcp.WithDescription("Create a new user"),
mcp.WithString("name", mcp.Required()),
mcp.WithString("email", mcp.Required()),
mcp.WithNumber("age", mcp.Min(0)),
),
handleCreateUser,
)
s.AddTool(
mcp.NewTool("search_users",
mcp.WithDescription("Search users with filters"),
mcp.WithString("query", mcp.Description("Search query")),
mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)),
mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)),
),
handleSearchUsers,
)
// Add resources
s.AddResource(
mcp.NewResource(
"users://{user_id}",
"User Profile",
mcp.WithResourceDescription("User profile data"),
mcp.WithMIMEType("application/json"),
),
handleUserResource,
)
// Start StreamableHTTP server
log.Println("Starting StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
userID := req.GetString("user_id", "")
if userID == "" {
return nil, fmt.Errorf("user_id is required")
}
// Simulate database lookup
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`,
user.ID, user.Name, user.Email, user.Age)), nil
}
func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
name := req.GetString("name", "")
email := req.GetString("email", "")
age := req.GetInt("age", 0)
if name == "" || email == "" {
return nil, fmt.Errorf("name and email are required")
}
// Validate input
if !isValidEmail(email) {
return nil, fmt.Errorf("invalid email format: %s", email)
}
// Create user
user := &User{
ID: generateID(),
Name: name,
Email: email,
Age: age,
CreatedAt: time.Now(),
}
if err := saveUserToDB(user); err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`,
user.ID, user.ID, user.Name, user.Email, user.Age)), nil
}
// Helper functions and types for the examples
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
CreatedAt time.Time `json:"created_at"`
}
func getUserFromDB(userID string) (*User, error) {
// Placeholder implementation
return &User{
ID: userID,
Name: "John Doe",
Email: "john@example.com",
Age: 30,
}, nil
}
func isValidEmail(email string) bool {
return strings.Contains(email, "@") && strings.Contains(email, ".")
}
func generateID() string {
// Placeholder implementation
return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
func saveUserToDB(user *User) error {
// Placeholder implementation
return nil
}
func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
query := req.GetString("query", "")
limit := req.GetInt("limit", 10)
offset := req.GetInt("offset", 0)
// Search users with pagination
users, total, err := searchUsersInDB(query, limit, offset)
if err != nil {
return nil, fmt.Errorf("search failed: %w", err)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`,
total, limit, offset, query)), nil
}
func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
userID := extractUserIDFromURI(req.Params.URI)
user, err := getUserFromDB(userID)
if err != nil {
return nil, fmt.Errorf("user not found: %s", userID)
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age),
},
}, nil
}
// Additional helper functions
func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) {
// Placeholder implementation
users := []*User{
{ID: "1", Name: "John Doe", Email: "john@example.com", Age: 30},
{ID: "2", Name: "Jane Smith", Email: "jane@example.com", Age: 25},
}
return users, len(users), nil
}
func extractUserIDFromURI(uri string) string {
// Extract user ID from URI like "users://123"
if len(uri) > 8 && uri[:8] == "users://" {
return uri[8:]
}
return uri
}
Advanced StreamableHTTP Configuration
func main() {
s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// Add comprehensive tools and resources
addCRUDTools(s)
addBatchTools(s)
addAnalyticsTools(s)
log.Println("Starting advanced StreamableHTTP server on :8080")
httpServer := server.NewStreamableHTTPServer(s,
server.WithEndpointPath("/api/v1/mcp"),
server.WithHeartbeatInterval(30*time.Second),
server.WithStateLess(false),
)
if err := httpServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
// Helper functions for the advanced example
func addCRUDTools(s *server.MCPServer) {
// Placeholder implementation - would add CRUD tools
}
func addBatchTools(s *server.MCPServer) {
// Placeholder implementation - would add batch processing tools
}
func addAnalyticsTools(s *server.MCPServer) {
// Placeholder implementation - would add analytics tools
}
func logToolCall(sessionID, toolName string, duration time.Duration, err error) {
// Placeholder implementation
if err != nil {
log.Printf("Tool %s failed: %v", toolName, err)
} else {
log.Printf("Tool %s completed in %v", toolName, duration)
}
}
func logResourceRead(sessionID, uri string, duration time.Duration, err error) {
// Placeholder implementation
if err != nil {
log.Printf("Resource read %s failed: %v", uri, err)
} else {
log.Printf("Resource read %s completed in %v", uri, duration)
}
}
func addStreamableHTTPMiddleware(s *server.MCPServer) {
// Authentication middleware
s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Extract and validate auth token
token := extractAuthToken(ctx)
if token == "" {
return nil, fmt.Errorf("authentication required")
}
user, err := validateToken(token)
if err != nil {
return nil, fmt.Errorf("invalid token: %w", err)
}
// Add user to context
ctx = context.WithValue(ctx, "user", user)
return next(ctx, req)
}
})
// Rate limiting middleware
s.AddToolMiddleware(func(next server.ToolHandler) server.ToolHandler {
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
clientIP := getClientIP(ctx)
if !rateLimiter.Allow(clientIP) {
return nil, fmt.Errorf("rate limit exceeded")
}
return next(ctx, req)
}
})
// Caching middleware
s.AddResourceMiddleware(func(next server.ResourceHandler) server.ResourceHandler {
return func(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
// Check cache first
if cached := getFromCache(req.Params.URI); cached != nil {
return cached, nil
}
result, err := next(ctx, req)
if err == nil {
// Cache successful results
setCache(req.Params.URI, result, 5*time.Minute)
}
return result, err
}
})
}
Endpoints
Standard MCP Endpoints
When you start a StreamableHTTP MCP server, it automatically creates these endpoints:
POST /mcp/initialize - Initialize MCP session
POST /mcp/tools/list - List available tools
POST /mcp/tools/call - Call a tool
POST /mcp/resources/list - List available resources
POST /mcp/resources/read - Read a resource
POST /mcp/prompts/list - List available prompts
POST /mcp/prompts/get - Get a prompt
GET /mcp/health - Health check
GET /mcp/capabilities - Server capabilities
Custom Endpoints
Add custom HTTP endpoints alongside MCP:
func main() {
s := server.NewMCPServer("Custom StreamableHTTP Server", "1.0.0")
// Create HTTP server with custom routes
mux := http.NewServeMux()
// Add MCP endpoints
server.AddMCPRoutes(mux, s, "/mcp")
// Add custom endpoints
mux.HandleFunc("/api/status", handleStatus)
mux.HandleFunc("/api/metrics", handleMetrics)
mux.HandleFunc("/api/users", handleUsersAPI)
mux.HandleFunc("/api/upload", handleFileUpload)
// Add middleware
handler := addMiddleware(mux)
log.Println("Starting custom StreamableHTTP server on :8080")
if err := http.ListenAndServe(":8080", handler); err != nil {
log.Fatal(err)
}
}
func handleStatus(w http.ResponseWriter, r *http.Request) {
status := map[string]interface{}{
"status": "healthy",
"timestamp": time.Now().Unix(),
"version": "1.0.0",
"uptime": time.Since(startTime).String(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}
func handleMetrics(w http.ResponseWriter, r *http.Request) {
metrics := collectMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
}
func handleUsersAPI(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
handleListUsers(w, r)
case "POST":
handleCreateUserAPI(w, r)
case "PUT":
handleUpdateUser(w, r)
case "DELETE":
handleDeleteUser(w, r)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
Request/Response Patterns
Standard MCP Request
POST /mcp/tools/call
Content-Type: application/json
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "search_users",
"arguments": {
"query": "john",
"limit": 10,
"offset": 0
}
}
}
Standard MCP Response
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "{\"users\":[...],\"total\":25,\"limit\":10,\"offset\":0}"
}
]
}
}
Error Response
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid params",
"data": {
"details": "user_id is required"
}
}
}
Session Management
Stateful vs Stateless
Stateless Design (Recommended)
// Each request is independent
func handleStatelessTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Extract all needed information from request
userID := extractUserFromToken(ctx)
params := req.Params.Arguments
// Process without relying on server state
result, err := processRequest(userID, params)
if err != nil {
return nil, err
}
return mcp.NewToolResultJSON(result), nil
}
// Use external storage for persistence
func getUserPreferences(userID string) (map[string]interface{}, error) {
// Load from database, cache, etc.
return loadFromRedis(fmt.Sprintf("user:%s:prefs", userID))
}
Stateful Design (When Needed)
type HTTPSessionManager struct {
sessions map[string]*HTTPSession
mutex sync.RWMutex
cleanup *time.Ticker
}
type HTTPSession struct {
ID string
UserID string
CreatedAt time.Time
LastAccess time.Time
Data map[string]interface{}
ExpiresAt time.Time
}
func NewHTTPSessionManager() *HTTPSessionManager {
sm := &HTTPSessionManager{
sessions: make(map[string]*HTTPSession),
cleanup: time.NewTicker(1 * time.Minute),
}
go sm.cleanupExpiredSessions()
return sm
}
func (sm *HTTPSessionManager) CreateSession(userID string) *HTTPSession {
sm.mutex.Lock()
defer sm.mutex.Unlock()
session := &HTTPSession{
ID: generateSessionID(),
UserID: userID,
CreatedAt: time.Now(),
LastAccess: time.Now(),
Data: make(map[string]interface{}),
ExpiresAt: time.Now().Add(30 * time.Minute),
}
sm.sessions[session.ID] = session
return session
}
func (sm *HTTPSessionManager) GetSession(sessionID string) (*HTTPSession, bool) {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
session, exists := sm.sessions[sessionID]
if !exists || time.Now().After(session.ExpiresAt) {
return nil, false
}
// Update last access
session.LastAccess = time.Now()
session.ExpiresAt = time.Now().Add(30 * time.Minute)
return session, true
}
func (sm *HTTPSessionManager) cleanupExpiredSessions() {
for range sm.cleanup.C {
sm.mutex.Lock()
now := time.Now()
for id, session := range sm.sessions {
if now.After(session.ExpiresAt) {
delete(sm.sessions, id)
}
}
sm.mutex.Unlock()
}
}
Authentication and Authorization
type AuthMiddleware struct {
jwtSecret []byte
userStore UserStore
}
func NewAuthMiddleware(secret []byte, store UserStore) *AuthMiddleware {
return &AuthMiddleware{
jwtSecret: secret,
userStore: store,
}
}
func (m *AuthMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract token from Authorization header
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
http.Error(w, "Missing or invalid authorization header", http.StatusUnauthorized)
return
}
token := strings.TrimPrefix(authHeader, "Bearer ")
// Validate JWT token
claims, err := m.validateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// Load user information
user, err := m.userStore.GetUser(claims.UserID)
if err != nil {
http.Error(w, "User not found", http.StatusUnauthorized)
return
}
// Add user to request context
ctx := context.WithValue(r.Context(), "user", user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func (m *AuthMiddleware) validateJWT(tokenString string) (*Claims, error) {
// Note: This example uses a hypothetical JWT library
// In practice, you would use a real JWT library like github.com/golang-jwt/jwt
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return m.jwtSecret, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
type Claims struct {
UserID string `json:"user_id"`
Role string `json:"role"`
jwt.StandardClaims
}
Next Steps
- In-Process Transport - Learn about embedded scenarios
- Client Development - Build MCP clients for HTTP transport
- Server Basics - Review fundamental server concepts