SSE Transport
Server-Sent Events (SSE) transport enables real-time, web-friendly communication between MCP clients and servers. Perfect for web applications that need live updates and multi-client support.
Use Cases
SSE transport is ideal for:
- Web applications: Browser-based LLM interfaces
- Real-time dashboards: Live data monitoring and visualization
- Collaborative tools: Multi-user environments with shared state
- Streaming responses: Long-running operations with progress updates
- Event-driven systems: Applications that need server-initiated communication
- Web-based chat interfaces with LLMs
- Real-time analytics dashboards
- Collaborative document editing
- Live system monitoring tools
- Streaming data processing interfaces
Implementation
Basic SSE Server
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("SSE Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// Add real-time tools
s.AddTool(
mcp.NewTool("stream_data",
mcp.WithDescription("Stream data with real-time updates"),
mcp.WithString("source", mcp.Required()),
mcp.WithNumber("count", mcp.DefaultNumber(10)),
),
handleStreamData,
)
s.AddTool(
mcp.NewTool("monitor_system",
mcp.WithDescription("Monitor system metrics in real-time"),
mcp.WithNumber("duration", mcp.DefaultNumber(60)),
),
handleSystemMonitor,
)
// Add dynamic resources
s.AddResource(
mcp.NewResource(
"metrics://current",
"Current System Metrics",
mcp.WithResourceDescription("Real-time system metrics"),
mcp.WithMIMEType("application/json"),
),
handleCurrentMetrics,
)
// Start SSE server
log.Println("Starting SSE server on :8080")
sseServer := server.NewSSEServer(s)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Access request headers
headers := req.Header
// Use headers for authentication, tracing, etc.
authToken := headers.Get("Authorization")
if authToken == "" {
return nil, fmt.Errorf("authentication required")
}
// Access other headers
requestID := headers.Get("X-Request-ID")
userAgent := headers.Get("User-Agent")
source := req.GetString("source", "")
count := req.GetInt("count", 10)
// Get server from context for notifications
mcpServer := server.ServerFromContext(ctx)
// Stream data with progress updates
var results []map[string]interface{}
for i := 0; i < count; i++ {
// Check for cancellation
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Simulate data processing
data := generateData(source, i)
results = append(results, data)
// Send progress notification
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
"progress": i + 1,
"total": count,
"message": fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
time.Sleep(100 * time.Millisecond)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`,
source, results, len(results))), nil
}
// Helper functions for the examples
func generateData(source string, index int) map[string]interface{} {
return map[string]interface{}{
"source": source,
"index": index,
"value": fmt.Sprintf("data_%d", index),
}
}
func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
duration := req.GetInt("duration", 60)
mcpServer := server.ServerFromContext(ctx)
// Monitor system for specified duration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
timeout := time.After(time.Duration(duration) * time.Second)
var metrics []map[string]interface{}
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeout:
return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
duration, metrics, len(metrics))), nil
case <-ticker.C:
// Collect current metrics
currentMetrics := collectSystemMetrics()
metrics = append(metrics, currentMetrics)
// Send real-time update
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
if err != nil {
log.Printf("Failed to send system metrics notification: %v", err)
}
}
}
}
}
func collectSystemMetrics() map[string]interface{} {
// Placeholder implementation
return map[string]interface{}{
"cpu": 50.5,
"memory": 75.2,
"disk": 30.1,
}
}
func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
metrics := collectSystemMetrics()
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
},
}, nil
}
Advanced SSE Configuration
func main() {
s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// Add collaborative tools
addCollaborativeTools(s)
addRealTimeResources(s)
log.Println("Starting advanced SSE server on :8080")
sseServer := server.NewSSEServer(s,
server.WithStaticBasePath("/mcp"),
server.WithKeepAliveInterval(30*time.Second),
server.WithBaseURL("http://localhost:8080"),
)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
// Helper functions for the advanced example
func addRealTimeResources(s *server.MCPServer) {
// Placeholder implementation - would add real-time resources
}
func addCollaborativeTools(s *server.MCPServer) {
// Shared document editing
s.AddTool(
mcp.NewTool("edit_document",
mcp.WithDescription("Edit a shared document"),
mcp.WithString("doc_id", mcp.Required()),
mcp.WithString("operation", mcp.Required()),
mcp.WithObject("data", mcp.Required()),
),
handleDocumentEdit,
)
// Real-time chat
s.AddTool(
mcp.NewTool("send_message",
mcp.WithDescription("Send a message to all connected clients"),
mcp.WithString("message", mcp.Required()),
mcp.WithString("channel", mcp.DefaultString("general")),
),
handleSendMessage,
)
// Live data updates
s.AddTool(
mcp.NewTool("subscribe_updates",
mcp.WithDescription("Subscribe to real-time data updates"),
mcp.WithString("topic", mcp.Required()),
mcp.WithArray("filters", mcp.Description("Optional filters")),
),
handleSubscribeUpdates,
)
}
Configuration
SSE Server Options
The SSE server can be configured with various options:
sseServer := server.NewSSEServer(s,
// Set the base path for SSE endpoints
server.WithStaticBasePath("/api/mcp"),
// Configure keep-alive interval
server.WithKeepAliveInterval(30*time.Second),
// Set base URL for client connections
server.WithBaseURL("http://localhost:8080"),
// Configure SSE and message endpoints
server.WithSSEEndpoint("/sse"),
server.WithMessageEndpoint("/message"),
// Add context function for request processing
server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
// Add custom context values from headers
return ctx
}),
)
- SSE stream:
http://localhost:8080/api/mcp/sse
- Message endpoint:
http://localhost:8080/api/mcp/message
Real-Time Notifications
SSE transport enables real-time server-to-client communication through notifications. Use the server context to send notifications:
func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get the MCP server from context
mcpServer := server.ServerFromContext(ctx)
// Send a notification to the client
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
"message": "Real-time update",
"timestamp": time.Now().Unix(),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}
Session Management
The SSE server automatically handles session management. You can send events to specific sessions using the server's notification methods:
// Send notification to current client session
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)
// Send notification to all connected clients (if supported)
// Note: Check the server implementation for broadcast capabilities
Request Headers
Like the StreamableHTTP transport, the SSE transport passes HTTP request headers to MCP handlers. This allows you to access the original HTTP headers that were sent with the SSE connection in your tool and resource handlers.
Accessing Headers in Handlers
Headers from the SSE connection are available in all MCP request objects:
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Access request headers
headers := req.Header
// Use headers for authentication, tracing, etc.
authToken := headers.Get("Authorization")
if authToken == "" {
return nil, fmt.Errorf("authentication required")
}
// Access other headers
requestID := headers.Get("X-Request-ID")
userAgent := headers.Get("User-Agent")
// Rest of your handler code...
mcpServer := server.ServerFromContext(ctx)
// ...
}
This works for all MCP request types including:
CallToolRequest
ReadResourceRequest
ListToolsRequest
ListResourcesRequest
InitializeRequest
- And other MCP request types
The headers are automatically populated by the SSE transport layer from the initial SSE connection and are available in your handlers without any additional configuration.
Note: Since SSE maintains a persistent connection, the headers are captured when the connection is established and remain the same for all requests during that connection's lifetime.
Next Steps
- HTTP Transport - Learn about traditional web service patterns
- In-Process Transport - Explore embedded scenarios
- Client Development - Build MCP clients for different transports