Skip to content

SSE Transport

Server-Sent Events (SSE) transport enables real-time, web-friendly communication between MCP clients and servers. Perfect for web applications that need live updates and multi-client support.

Use Cases

SSE transport is ideal for:

  • Web applications: Browser-based LLM interfaces
  • Real-time dashboards: Live data monitoring and visualization
  • Collaborative tools: Multi-user environments with shared state
  • Streaming responses: Long-running operations with progress updates
  • Event-driven systems: Applications that need server-initiated communication
Example applications:
  • Web-based chat interfaces with LLMs
  • Real-time analytics dashboards
  • Collaborative document editing
  • Live system monitoring tools
  • Streaming data processing interfaces

Implementation

Basic SSE Server

package main
 
import (
    "context"
    "fmt"
    "log"
    "time"
 
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)
 
func main() {
    s := server.NewMCPServer("SSE Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )
 
    // Add real-time tools
    s.AddTool(
        mcp.NewTool("stream_data",
            mcp.WithDescription("Stream data with real-time updates"),
            mcp.WithString("source", mcp.Required()),
            mcp.WithNumber("count", mcp.DefaultNumber(10)),
        ),
        handleStreamData,
    )
 
    s.AddTool(
        mcp.NewTool("monitor_system",
            mcp.WithDescription("Monitor system metrics in real-time"),
            mcp.WithNumber("duration", mcp.DefaultNumber(60)),
        ),
        handleSystemMonitor,
    )
 
    // Add dynamic resources
    s.AddResource(
        mcp.NewResource(
            "metrics://current",
            "Current System Metrics",
            mcp.WithResourceDescription("Real-time system metrics"),
            mcp.WithMIMEType("application/json"),
        ),
        handleCurrentMetrics,
    )
 
    // Start SSE server
    log.Println("Starting SSE server on :8080")
    sseServer := server.NewSSEServer(s)
    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Access request headers
    headers := req.Header
 
    // Use headers for authentication, tracing, etc.
    authToken := headers.Get("Authorization")
    if authToken == "" {
        return nil, fmt.Errorf("authentication required")
    }
    
    // Access other headers
    requestID := headers.Get("X-Request-ID")
    userAgent := headers.Get("User-Agent")
    
    source := req.GetString("source", "")
    count := req.GetInt("count", 10)
 
    // Get server from context for notifications
    mcpServer := server.ServerFromContext(ctx)
 
    // Stream data with progress updates
    var results []map[string]interface{}
    for i := 0; i < count; i++ {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
 
        // Simulate data processing
        data := generateData(source, i)
        results = append(results, data)
 
        // Send progress notification
        if mcpServer != nil {
            err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
                "progress": i + 1,
                "total":    count,
                "message":  fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
            })
            if err != nil {
                log.Printf("Failed to send notification: %v", err)
            }
        }
 
        time.Sleep(100 * time.Millisecond)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`, 
        source, results, len(results))), nil
}
 
// Helper functions for the examples
func generateData(source string, index int) map[string]interface{} {
    return map[string]interface{}{
        "source": source,
        "index":  index,
        "value":  fmt.Sprintf("data_%d", index),
    }
}
 
func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    duration := req.GetInt("duration", 60)
    
    mcpServer := server.ServerFromContext(ctx)
 
    // Monitor system for specified duration
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
 
    timeout := time.After(time.Duration(duration) * time.Second)
    var metrics []map[string]interface{}
 
    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-timeout:
            return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
                duration, metrics, len(metrics))), nil
        case <-ticker.C:
            // Collect current metrics
            currentMetrics := collectSystemMetrics()
            metrics = append(metrics, currentMetrics)
 
            // Send real-time update
            if mcpServer != nil {
                err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
                if err != nil {
                    log.Printf("Failed to send system metrics notification: %v", err)
                }
            }
        }
    }
}
 
func collectSystemMetrics() map[string]interface{} {
    // Placeholder implementation
    return map[string]interface{}{
        "cpu":    50.5,
        "memory": 75.2,
        "disk":   30.1,
    }
}
 
func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    metrics := collectSystemMetrics()
    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
        },
    }, nil
}

Advanced SSE Configuration

func main() {
    s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )
 
    // Add collaborative tools
    addCollaborativeTools(s)
    addRealTimeResources(s)
 
    log.Println("Starting advanced SSE server on :8080")
    sseServer := server.NewSSEServer(s,
        server.WithStaticBasePath("/mcp"),
        server.WithKeepAliveInterval(30*time.Second),
        server.WithBaseURL("http://localhost:8080"),
    )
    
    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
// Helper functions for the advanced example
func addRealTimeResources(s *server.MCPServer) {
    // Placeholder implementation - would add real-time resources
}
 
func addCollaborativeTools(s *server.MCPServer) {
    // Shared document editing
    s.AddTool(
        mcp.NewTool("edit_document",
            mcp.WithDescription("Edit a shared document"),
            mcp.WithString("doc_id", mcp.Required()),
            mcp.WithString("operation", mcp.Required()),
            mcp.WithObject("data", mcp.Required()),
        ),
        handleDocumentEdit,
    )
 
    // Real-time chat
    s.AddTool(
        mcp.NewTool("send_message",
            mcp.WithDescription("Send a message to all connected clients"),
            mcp.WithString("message", mcp.Required()),
            mcp.WithString("channel", mcp.DefaultString("general")),
        ),
        handleSendMessage,
    )
 
    // Live data updates
    s.AddTool(
        mcp.NewTool("subscribe_updates",
            mcp.WithDescription("Subscribe to real-time data updates"),
            mcp.WithString("topic", mcp.Required()),
            mcp.WithArray("filters", mcp.Description("Optional filters")),
        ),
        handleSubscribeUpdates,
    )
}

Configuration

SSE Server Options

The SSE server can be configured with various options:

sseServer := server.NewSSEServer(s,
    // Set the base path for SSE endpoints
    server.WithStaticBasePath("/api/mcp"),
    
    // Configure keep-alive interval
    server.WithKeepAliveInterval(30*time.Second),
    
    // Set base URL for client connections
    server.WithBaseURL("http://localhost:8080"),
    
    // Configure SSE and message endpoints
    server.WithSSEEndpoint("/sse"),
    server.WithMessageEndpoint("/message"),
    
    // Add context function for request processing
    server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
        // Add custom context values from headers
        return ctx
    }),
)
Resulting endpoints:
  • SSE stream: http://localhost:8080/api/mcp/sse
  • Message endpoint: http://localhost:8080/api/mcp/message

Real-Time Notifications

SSE transport enables real-time server-to-client communication through notifications. Use the server context to send notifications:

func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Get the MCP server from context
    mcpServer := server.ServerFromContext(ctx)
    
    // Send a notification to the client
    if mcpServer != nil {
        err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
            "message": "Real-time update",
            "timestamp": time.Now().Unix(),
        })
        if err != nil {
            log.Printf("Failed to send notification: %v", err)
        }
    }
    
    return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}

Session Management

The SSE server automatically handles session management. You can send events to specific sessions using the server's notification methods:

// Send notification to current client session
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)
 
// Send notification to all connected clients (if supported)
// Note: Check the server implementation for broadcast capabilities

Request Headers

Like the StreamableHTTP transport, the SSE transport passes HTTP request headers to MCP handlers. This allows you to access the original HTTP headers that were sent with the SSE connection in your tool and resource handlers.

Accessing Headers in Handlers

Headers from the SSE connection are available in all MCP request objects:

func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Access request headers
    headers := req.Header
 
    // Use headers for authentication, tracing, etc.
    authToken := headers.Get("Authorization")
    if authToken == "" {
        return nil, fmt.Errorf("authentication required")
    }
    
    // Access other headers
    requestID := headers.Get("X-Request-ID")
    userAgent := headers.Get("User-Agent")
    
    // Rest of your handler code...
    mcpServer := server.ServerFromContext(ctx)
    // ...
}

This works for all MCP request types including:

  • CallToolRequest
  • ReadResourceRequest
  • ListToolsRequest
  • ListResourcesRequest
  • InitializeRequest
  • And other MCP request types

The headers are automatically populated by the SSE transport layer from the initial SSE connection and are available in your handlers without any additional configuration.

Note: Since SSE maintains a persistent connection, the headers are captured when the connection is established and remain the same for all requests during that connection's lifetime.

Next Steps