Skip to content

SSE Transport

Server-Sent Events (SSE) transport enables real-time, web-friendly communication between MCP clients and servers. Perfect for web applications that need live updates and multi-client support.

Use Cases

SSE transport is ideal for:

  • Web applications: Browser-based LLM interfaces
  • Real-time dashboards: Live data monitoring and visualization
  • Collaborative tools: Multi-user environments with shared state
  • Streaming responses: Long-running operations with progress updates
  • Event-driven systems: Applications that need server-initiated communication
Example applications:
  • Web-based chat interfaces with LLMs
  • Real-time analytics dashboards
  • Collaborative document editing
  • Live system monitoring tools
  • Streaming data processing interfaces

Implementation

Basic SSE Server

package main
 
import (
    "context"
    "fmt"
    "log"
    "time"
 
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)
 
func main() {
    s := server.NewMCPServer("SSE Server", "1.0.0",
        server.WithToolCapabilities(true),
        server.WithResourceCapabilities(true, true),
    )
 
    // Add real-time tools
    s.AddTool(
        mcp.NewTool("stream_data",
            mcp.WithDescription("Stream data with real-time updates"),
            mcp.WithString("source", mcp.Required()),
            mcp.WithNumber("count", mcp.DefaultNumber(10)),
        ),
        handleStreamData,
    )
 
    s.AddTool(
        mcp.NewTool("monitor_system",
            mcp.WithDescription("Monitor system metrics in real-time"),
            mcp.WithNumber("duration", mcp.DefaultNumber(60)),
        ),
        handleSystemMonitor,
    )
 
    // Add dynamic resources
    s.AddResource(
        mcp.NewResource(
            "metrics://current",
            "Current System Metrics",
            mcp.WithResourceDescription("Real-time system metrics"),
            mcp.WithMIMEType("application/json"),
        ),
        handleCurrentMetrics,
    )
 
    // Start SSE server
    log.Println("Starting SSE server on :8080")
    sseServer := server.NewSSEServer(s)
    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    source := req.GetString("source", "")
    count := req.GetInt("count", 10)
 
    // Get server from context for notifications
    mcpServer := server.ServerFromContext(ctx)
 
    // Stream data with progress updates
    var results []map[string]interface{}
    for i := 0; i < count; i++ {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
 
        // Simulate data processing
        data := generateData(source, i)
        results = append(results, data)
 
        // Send progress notification
        if mcpServer != nil {
            err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
                "progress": i + 1,
                "total":    count,
                "message":  fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
            })
            if err != nil {
                log.Printf("Failed to send notification: %v", err)
            }
        }
 
        time.Sleep(100 * time.Millisecond)
    }
 
    return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`, 
        source, results, len(results))), nil
}
 
// Helper functions for the examples
func generateData(source string, index int) map[string]interface{} {
    return map[string]interface{}{
        "source": source,
        "index":  index,
        "value":  fmt.Sprintf("data_%d", index),
    }
}
 
func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    duration := req.GetInt("duration", 60)
    
    mcpServer := server.ServerFromContext(ctx)
 
    // Monitor system for specified duration
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
 
    timeout := time.After(time.Duration(duration) * time.Second)
    var metrics []map[string]interface{}
 
    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-timeout:
            return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
                duration, metrics, len(metrics))), nil
        case <-ticker.C:
            // Collect current metrics
            currentMetrics := collectSystemMetrics()
            metrics = append(metrics, currentMetrics)
 
            // Send real-time update
            if mcpServer != nil {
                err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
                if err != nil {
                    log.Printf("Failed to send system metrics notification: %v", err)
                }
            }
        }
    }
}
 
func collectSystemMetrics() map[string]interface{} {
    // Placeholder implementation
    return map[string]interface{}{
        "cpu":    50.5,
        "memory": 75.2,
        "disk":   30.1,
    }
}
 
func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    metrics := collectSystemMetrics()
    return []mcp.ResourceContents{
        mcp.TextResourceContents{
            URI:      req.Params.URI,
            MIMEType: "application/json",
            Text:     fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
        },
    }, nil
}

Advanced SSE Configuration

func main() {
    s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
        server.WithLogging(),
    )
 
    // Add collaborative tools
    addCollaborativeTools(s)
    addRealTimeResources(s)
 
    log.Println("Starting advanced SSE server on :8080")
    sseServer := server.NewSSEServer(s,
        server.WithStaticBasePath("/mcp"),
        server.WithKeepAliveInterval(30*time.Second),
        server.WithBaseURL("http://localhost:8080"),
    )
    
    if err := sseServer.Start(":8080"); err != nil {
        log.Fatal(err)
    }
}
 
// Helper functions for the advanced example
func addRealTimeResources(s *server.MCPServer) {
    // Placeholder implementation - would add real-time resources
}
 
func addCollaborativeTools(s *server.MCPServer) {
    // Shared document editing
    s.AddTool(
        mcp.NewTool("edit_document",
            mcp.WithDescription("Edit a shared document"),
            mcp.WithString("doc_id", mcp.Required()),
            mcp.WithString("operation", mcp.Required()),
            mcp.WithObject("data", mcp.Required()),
        ),
        handleDocumentEdit,
    )
 
    // Real-time chat
    s.AddTool(
        mcp.NewTool("send_message",
            mcp.WithDescription("Send a message to all connected clients"),
            mcp.WithString("message", mcp.Required()),
            mcp.WithString("channel", mcp.DefaultString("general")),
        ),
        handleSendMessage,
    )
 
    // Live data updates
    s.AddTool(
        mcp.NewTool("subscribe_updates",
            mcp.WithDescription("Subscribe to real-time data updates"),
            mcp.WithString("topic", mcp.Required()),
            mcp.WithArray("filters", mcp.Description("Optional filters")),
        ),
        handleSubscribeUpdates,
    )
}

Configuration

SSE Server Options

The SSE server can be configured with various options:

sseServer := server.NewSSEServer(s,
    // Set the base path for SSE endpoints
    server.WithStaticBasePath("/api/mcp"),
    
    // Configure keep-alive interval
    server.WithKeepAliveInterval(30*time.Second),
    
    // Set base URL for client connections
    server.WithBaseURL("http://localhost:8080"),
    
    // Configure SSE and message endpoints
    server.WithSSEEndpoint("/sse"),
    server.WithMessageEndpoint("/message"),
    
    // Add context function for request processing
    server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
        // Add custom context values from headers
        return ctx
    }),
)
Resulting endpoints:
  • SSE stream: http://localhost:8080/api/mcp/sse
  • Message endpoint: http://localhost:8080/api/mcp/message

Real-Time Notifications

SSE transport enables real-time server-to-client communication through notifications. Use the server context to send notifications:

func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // Get the MCP server from context
    mcpServer := server.ServerFromContext(ctx)
    
    // Send a notification to the client
    if mcpServer != nil {
        err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
            "message": "Real-time update",
            "timestamp": time.Now().Unix(),
        })
        if err != nil {
            log.Printf("Failed to send notification: %v", err)
        }
    }
    
    return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}

Session Management

The SSE server automatically handles session management. You can send events to specific sessions using the server's notification methods:

// Send notification to current client session
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)
 
// Send notification to all connected clients (if supported)
// Note: Check the server implementation for broadcast capabilities

Next Steps