SSE Transport
Server-Sent Events (SSE) transport enables real-time, web-friendly communication between MCP clients and servers. Perfect for web applications that need live updates and multi-client support.
Use Cases
SSE transport is ideal for:
- Web applications: Browser-based LLM interfaces
- Real-time dashboards: Live data monitoring and visualization
- Collaborative tools: Multi-user environments with shared state
- Streaming responses: Long-running operations with progress updates
- Event-driven systems: Applications that need server-initiated communication
- Web-based chat interfaces with LLMs
- Real-time analytics dashboards
- Collaborative document editing
- Live system monitoring tools
- Streaming data processing interfaces
Implementation
Basic SSE Server
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer("SSE Server", "1.0.0",
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
)
// Add real-time tools
s.AddTool(
mcp.NewTool("stream_data",
mcp.WithDescription("Stream data with real-time updates"),
mcp.WithString("source", mcp.Required()),
mcp.WithNumber("count", mcp.DefaultNumber(10)),
),
handleStreamData,
)
s.AddTool(
mcp.NewTool("monitor_system",
mcp.WithDescription("Monitor system metrics in real-time"),
mcp.WithNumber("duration", mcp.DefaultNumber(60)),
),
handleSystemMonitor,
)
// Add dynamic resources
s.AddResource(
mcp.NewResource(
"metrics://current",
"Current System Metrics",
mcp.WithResourceDescription("Real-time system metrics"),
mcp.WithMIMEType("application/json"),
),
handleCurrentMetrics,
)
// Start SSE server
log.Println("Starting SSE server on :8080")
sseServer := server.NewSSEServer(s)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
source := req.GetString("source", "")
count := req.GetInt("count", 10)
// Get server from context for notifications
mcpServer := server.ServerFromContext(ctx)
// Stream data with progress updates
var results []map[string]interface{}
for i := 0; i < count; i++ {
// Check for cancellation
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Simulate data processing
data := generateData(source, i)
results = append(results, data)
// Send progress notification
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{
"progress": i + 1,
"total": count,
"message": fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
time.Sleep(100 * time.Millisecond)
}
return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`,
source, results, len(results))), nil
}
// Helper functions for the examples
func generateData(source string, index int) map[string]interface{} {
return map[string]interface{}{
"source": source,
"index": index,
"value": fmt.Sprintf("data_%d", index),
}
}
func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
duration := req.GetInt("duration", 60)
mcpServer := server.ServerFromContext(ctx)
// Monitor system for specified duration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
timeout := time.After(time.Duration(duration) * time.Second)
var metrics []map[string]interface{}
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeout:
return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`,
duration, metrics, len(metrics))), nil
case <-ticker.C:
// Collect current metrics
currentMetrics := collectSystemMetrics()
metrics = append(metrics, currentMetrics)
// Send real-time update
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics)
if err != nil {
log.Printf("Failed to send system metrics notification: %v", err)
}
}
}
}
}
func collectSystemMetrics() map[string]interface{} {
// Placeholder implementation
return map[string]interface{}{
"cpu": 50.5,
"memory": 75.2,
"disk": 30.1,
}
}
func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
metrics := collectSystemMetrics()
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: req.Params.URI,
MIMEType: "application/json",
Text: fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]),
},
}, nil
}
Advanced SSE Configuration
func main() {
s := server.NewMCPServer("Advanced SSE Server", "1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
server.WithLogging(),
)
// Add collaborative tools
addCollaborativeTools(s)
addRealTimeResources(s)
log.Println("Starting advanced SSE server on :8080")
sseServer := server.NewSSEServer(s,
server.WithStaticBasePath("/mcp"),
server.WithKeepAliveInterval(30*time.Second),
server.WithBaseURL("http://localhost:8080"),
)
if err := sseServer.Start(":8080"); err != nil {
log.Fatal(err)
}
}
// Helper functions for the advanced example
func addRealTimeResources(s *server.MCPServer) {
// Placeholder implementation - would add real-time resources
}
func addCollaborativeTools(s *server.MCPServer) {
// Shared document editing
s.AddTool(
mcp.NewTool("edit_document",
mcp.WithDescription("Edit a shared document"),
mcp.WithString("doc_id", mcp.Required()),
mcp.WithString("operation", mcp.Required()),
mcp.WithObject("data", mcp.Required()),
),
handleDocumentEdit,
)
// Real-time chat
s.AddTool(
mcp.NewTool("send_message",
mcp.WithDescription("Send a message to all connected clients"),
mcp.WithString("message", mcp.Required()),
mcp.WithString("channel", mcp.DefaultString("general")),
),
handleSendMessage,
)
// Live data updates
s.AddTool(
mcp.NewTool("subscribe_updates",
mcp.WithDescription("Subscribe to real-time data updates"),
mcp.WithString("topic", mcp.Required()),
mcp.WithArray("filters", mcp.Description("Optional filters")),
),
handleSubscribeUpdates,
)
}
Configuration
SSE Server Options
The SSE server can be configured with various options:
sseServer := server.NewSSEServer(s,
// Set the base path for SSE endpoints
server.WithStaticBasePath("/api/mcp"),
// Configure keep-alive interval
server.WithKeepAliveInterval(30*time.Second),
// Set base URL for client connections
server.WithBaseURL("http://localhost:8080"),
// Configure SSE and message endpoints
server.WithSSEEndpoint("/sse"),
server.WithMessageEndpoint("/message"),
// Add context function for request processing
server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context {
// Add custom context values from headers
return ctx
}),
)
- SSE stream:
http://localhost:8080/api/mcp/sse
- Message endpoint:
http://localhost:8080/api/mcp/message
Real-Time Notifications
SSE transport enables real-time server-to-client communication through notifications. Use the server context to send notifications:
func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get the MCP server from context
mcpServer := server.ServerFromContext(ctx)
// Send a notification to the client
if mcpServer != nil {
err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{
"message": "Real-time update",
"timestamp": time.Now().Unix(),
})
if err != nil {
log.Printf("Failed to send notification: %v", err)
}
}
return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil
}
Session Management
The SSE server automatically handles session management. You can send events to specific sessions using the server's notification methods:
// Send notification to current client session
mcpServer.SendNotificationToClient(ctx, "progress_update", progressData)
// Send notification to all connected clients (if supported)
// Note: Check the server implementation for broadcast capabilities
Next Steps
- HTTP Transport - Learn about traditional web service patterns
- In-Process Transport - Explore embedded scenarios
- Client Development - Build MCP clients for different transports