Introduction to Server-Sent Events
Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via HTTP connection. Unlike WebSockets, SSE is unidirectional (server to client only) and works over standard HTTP.
1. Basic SSE Implementation
JAX-RS SSE Endpoint
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Path("/events")
public class SseResource {
private Sse sse;
private SseBroadcaster broadcaster;
private final ConcurrentHashMap<String, SseEventSink> clientSinks = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
// Setup broadcaster error handling
this.broadcaster.onClose((sseEventSink) -> {
System.out.println("Client disconnected from broadcaster");
});
this.broadcaster.onError((sseEventSink, throwable) -> {
System.err.println("Broadcaster error: " + throwable.getMessage());
});
}
/**
* Basic SSE endpoint - clients connect here to receive events
*/
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink) {
String clientId = "client-" + System.currentTimeMillis();
clientSinks.put(clientId, eventSink);
// Send welcome message
eventSink.send(sse.newEventBuilder()
.name("welcome")
.data("Connected to SSE server. Client ID: " + clientId)
.build());
// Setup client disconnect handling
eventSink.onClose(() -> {
clientSinks.remove(clientId);
System.out.println("Client disconnected: " + clientId);
});
eventSink.onError((throwable) -> {
clientSinks.remove(clientId);
System.err.println("Client error: " + throwable.getMessage());
});
System.out.println("New client connected: " + clientId);
}
/**
* Send message to all connected clients via broadcaster
*/
@POST
@Path("/broadcast")
@Consumes(MediaType.TEXT_PLAIN)
public void broadcastMessage(String message) {
if (broadcaster != null) {
broadcaster.broadcast(sse.newEvent(message));
System.out.println("Broadcasted message: " + message);
}
}
/**
* Send structured event to all clients
*/
@POST
@Path("/event")
@Consumes(MediaType.APPLICATION_JSON)
public void sendStructuredEvent(EventData eventData) {
if (broadcaster != null) {
broadcaster.broadcast(sse.newEventBuilder()
.name(eventData.getType())
.data(eventData.getClass(), eventData)
.id(String.valueOf(System.currentTimeMillis()))
.build());
System.out.println("Sent structured event: " + eventData);
}
}
/**
* Start periodic updates (e.g., stock prices, metrics)
*/
@POST
@Path("/start-updates")
public String startPeriodicUpdates() {
scheduler.scheduleAtFixedRate(this::sendPeriodicUpdate, 0, 5, TimeUnit.SECONDS);
return "Periodic updates started";
}
private void sendPeriodicUpdate() {
if (broadcaster != null) {
String update = "Server time: " + System.currentTimeMillis();
broadcaster.broadcast(sse.newEventBuilder()
.name("periodic-update")
.data(update)
.id(String.valueOf(System.currentTimeMillis()))
.build());
}
}
@PreDestroy
public void cleanup() {
scheduler.shutdown();
if (broadcaster != null) {
broadcaster.close();
}
}
// Data class for structured events
public static class EventData {
private String type;
private String message;
private long timestamp;
// Constructors, getters, setters
public EventData() {}
public EventData(String type, String message) {
this.type = type;
this.message = message;
this.timestamp = System.currentTimeMillis();
}
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return String.format("EventData{type='%s', message='%s', timestamp=%d}",
type, message, timestamp);
}
}
}
Spring WebFlux SSE Implementation
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/sse")
public class SpringSseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* SSE endpoint using Spring's SseEmitter
*/
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
SseEmitter emitter = new SseEmitter(60_000L); // 60 seconds timeout
String clientId = "client-" + System.currentTimeMillis();
emitters.put(clientId, emitter);
// Send initial connection event
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("connected")
.data("Client connected: " + clientId)
.id(clientId);
emitter.send(event);
} catch (IOException e) {
emitter.completeWithError(e);
}
// Handle client disconnect
emitter.onCompletion(() -> {
emitters.remove(clientId);
System.out.println("Client completed: " + clientId);
});
emitter.onTimeout(() -> {
emitters.remove(clientId);
System.out.println("Client timeout: " + clientId);
});
emitter.onError((ex) -> {
emitters.remove(clientId);
System.out.println("Client error: " + clientId + " - " + ex.getMessage());
});
System.out.println("New SSE client connected: " + clientId);
return emitter;
}
/**
* Reactive SSE endpoint using WebFlux
*/
@GetMapping(path = "/reactive-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> reactiveStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event #" + sequence + " - " + System.currentTimeMillis())
.doOnNext(event -> System.out.println("Sending: " + event));
}
/**
* Broadcast message to all connected clients
*/
@PostMapping("/broadcast")
public String broadcastMessage(@RequestBody String message) {
int successCount = 0;
int failureCount = 0;
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("broadcast")
.data(message)
.id(String.valueOf(System.currentTimeMillis()));
entry.getValue().send(event);
successCount++;
} catch (IOException e) {
failureCount++;
System.err.println("Failed to send to client " + entry.getKey() + ": " + e.getMessage());
emitters.remove(entry.getKey());
}
}
return String.format("Broadcast completed. Success: %d, Failed: %d", successCount, failureCount);
}
/**
* Send event to specific client
*/
@PostMapping("/send/{clientId}")
public String sendToClient(@PathVariable String clientId, @RequestBody String message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("direct-message")
.data(message)
.id(String.valueOf(System.currentTimeMillis()));
emitter.send(event);
return "Message sent to client: " + clientId;
} catch (IOException e) {
emitters.remove(clientId);
return "Failed to send to client (disconnected): " + clientId;
}
}
return "Client not found: " + clientId;
}
/**
* Start periodic server-generated events
*/
@PostMapping("/start-server-events")
public String startServerEvents() {
scheduler.scheduleAtFixedRate(() -> {
String event = "Server event at " + System.currentTimeMillis();
broadcastMessage(event);
}, 0, 3, TimeUnit.SECONDS);
return "Server events started";
}
@PreDestroy
public void cleanup() {
scheduler.shutdown();
// Complete all emitters
for (SseEmitter emitter : emitters.values()) {
emitter.complete();
}
emitters.clear();
}
}
2. Advanced SSE Features
Event Reconnection and IDs
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.core.Context;
import java.util.concurrent.atomic.AtomicLong;
@Path("/advanced-events")
public class AdvancedSseResource {
private Sse sse;
private final AtomicLong eventId = new AtomicLong(0);
@Context
public void setSse(Sse sse) {
this.sse = sse;
}
/**
* SSE with proper event IDs for client reconnection
*/
@GET
@Path("/reliable-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getReliableStream(@Context SseEventSink eventSink,
@QueryParam("lastEventId") @DefaultValue("0") long lastEventId) {
System.out.println("Client connected. Last event ID: " + lastEventId);
// Send missed events if client reconnected
if (lastEventId > 0) {
long currentId = eventId.get();
for (long i = lastEventId + 1; i <= currentId; i++) {
eventSink.send(createEvent("catchup", "Missed event: " + i, i));
}
}
// Start sending new events
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
long id = eventId.incrementAndGet();
eventSink.send(createEvent("message", "Event " + id, id));
Thread.sleep(2000);
}
// Send completion event
eventSink.send(sse.newEventBuilder()
.name("complete")
.data("Stream completed")
.id(String.valueOf(eventId.incrementAndGet()))
.build());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.println("Error sending events: " + e.getMessage());
}
}).start();
eventSink.onClose(() -> {
System.out.println("Reliable stream client disconnected");
});
}
/**
* SSE with retry configuration
*/
@GET
@Path("/configurable-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getConfigurableStream(@Context SseEventSink eventSink,
@QueryParam("retry") @DefaultValue("5000") long retryMs) {
// Send retry configuration to client
eventSink.send(sse.newEventBuilder()
.comment("retry: " + retryMs)
.data("Stream configured with retry: " + retryMs + "ms")
.build());
// Send events with potential errors to test reconnection
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
eventSink.send(sse.newEventBuilder()
.name("data")
.data("Message " + i)
.id(String.valueOf(i))
.build());
// Simulate server error on 3rd message to test reconnection
if (i == 3) {
Thread.sleep(1000);
throw new RuntimeException("Simulated server error");
}
Thread.sleep(2000);
}
} catch (Exception e) {
System.err.println("Stream interrupted: " + e.getMessage());
}
}).start();
}
private javax.ws.rs.sse.OutboundSseEvent createEvent(String name, String data, long id) {
return sse.newEventBuilder()
.name(name)
.data(data)
.id(String.valueOf(id))
.build();
}
/**
* Server-side event source (consuming external SSE)
*/
@GET
@Path("/proxy")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void proxyExternalEvents(@Context SseEventSink eventSink) {
// This would typically connect to an external SSE source
javax.ws.rs.client.Client client = javax.ws.rs.client.ClientBuilder.newClient();
SseEventSource eventSource = SseEventSource.target(
client.target("http://external-service.com/events")
).build();
eventSource.register(
event -> {
// Forward external events to our client
eventSink.send(sse.newEventBuilder()
.name("proxy")
.data(event.readData())
.id(event.getId())
.build());
},
throwable -> {
System.err.println("Error in proxy: " + throwable.getMessage());
eventSink.send(sse.newEvent("error", "Proxy error: " + throwable.getMessage()));
},
() -> {
System.out.println("Proxy connection closed");
eventSink.send(sse.newEvent("complete", "Proxy stream ended"));
}
);
eventSource.open();
eventSink.onClose(() -> {
eventSource.close();
client.close();
});
}
}
SSE with Authentication and Filtering
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Context;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Path("/secure-events")
public class SecureSseResource {
private Sse sse;
private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
private final Map<String, SseBroadcaster> roomBroadcasters = new ConcurrentHashMap<>();
@Context
public void setSse(Sse sse) {
this.sse = sse;
}
/**
* SSE endpoint with authentication
*/
@GET
@Path("/user-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getUserStream(@Context SseEventSink eventSink,
@HeaderParam("Authorization") String authHeader) {
// Simple authentication (in real app, use proper auth)
String userId = authenticateUser(authHeader);
if (userId == null) {
eventSink.send(sse.newEvent("error", "Authentication failed"));
eventSink.close();
return;
}
UserSession session = new UserSession(userId, eventSink);
userSessions.put(userId, session);
// Send personalized welcome
eventSink.send(sse.newEventBuilder()
.name("welcome")
.data("Welcome, user " + userId)
.build());
eventSink.onClose(() -> {
userSessions.remove(userId);
System.out.println("User disconnected: " + userId);
});
eventSink.onError((throwable) -> {
userSessions.remove(userId);
System.err.println("User error: " + userId + " - " + throwable.getMessage());
});
System.out.println("User connected: " + userId);
}
/**
* Join a room/channel for group messaging
*/
@POST
@Path("/join-room/{roomId}")
public String joinRoom(@HeaderParam("Authorization") String authHeader,
@PathParam("roomId") String roomId) {
String userId = authenticateUser(authHeader);
if (userId == null) {
return "Authentication failed";
}
UserSession session = userSessions.get(userId);
if (session == null) {
return "User not connected via SSE";
}
SseBroadcaster roomBroadcaster = roomBroadcasters.computeIfAbsent(roomId,
id -> sse.newBroadcaster());
roomBroadcaster.register(session.getEventSink());
session.joinRoom(roomId);
// Notify room
roomBroadcaster.broadcast(sse.newEventBuilder()
.name("user-joined")
.data("User " + userId + " joined room " + roomId)
.build());
return "Joined room: " + roomId;
}
/**
* Send message to a specific room
*/
@POST
@Path("/room/{roomId}/message")
@Consumes(MediaType.TEXT_PLAIN)
public String sendRoomMessage(@HeaderParam("Authorization") String authHeader,
@PathParam("roomId") String roomId,
String message) {
String userId = authenticateUser(authHeader);
if (userId == null) {
return "Authentication failed";
}
SseBroadcaster roomBroadcaster = roomBroadcasters.get(roomId);
if (roomBroadcaster != null) {
roomBroadcaster.broadcast(sse.newEventBuilder()
.name("room-message")
.data(userId + ": " + message)
.build());
return "Message sent to room: " + roomId;
}
return "Room not found: " + roomId;
}
/**
* Send private message to specific user
*/
@POST
@Path("/private-message/{targetUserId}")
@Consumes(MediaType.TEXT_PLAIN)
public String sendPrivateMessage(@HeaderParam("Authorization") String authHeader,
@PathParam("targetUserId") String targetUserId,
String message) {
String senderId = authenticateUser(authHeader);
if (senderId == null) {
return "Authentication failed";
}
UserSession targetSession = userSessions.get(targetUserId);
if (targetSession != null) {
try {
targetSession.getEventSink().send(sse.newEventBuilder()
.name("private-message")
.data("From " + senderId + ": " + message)
.build());
return "Private message sent to: " + targetUserId;
} catch (Exception e) {
userSessions.remove(targetUserId);
return "User disconnected: " + targetUserId;
}
}
return "User not connected: " + targetUserId;
}
private String authenticateUser(String authHeader) {
// Simplified authentication - in real app, use JWT or other auth mechanism
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7); // Return token as user ID for demo
}
return null;
}
/**
* User session tracking
*/
private static class UserSession {
private final String userId;
private final SseEventSink eventSink;
private final Set<String> joinedRooms = ConcurrentHashMap.newKeySet();
public UserSession(String userId, SseEventSink eventSink) {
this.userId = userId;
this.eventSink = eventSink;
}
public String getUserId() { return userId; }
public SseEventSink getEventSink() { return eventSink; }
public void joinRoom(String roomId) {
joinedRooms.add(roomId);
}
public void leaveRoom(String roomId) {
joinedRooms.remove(roomId);
}
public Set<String> getJoinedRooms() {
return Collections.unmodifiableSet(joinedRooms);
}
}
}
3. Real-World Use Cases
Stock Price Updates
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.*;
import java.util.concurrent.*;
@Path("/stocks")
public class StockPriceSseResource {
private Sse sse;
private SseBroadcaster broadcaster;
private final ScheduledExecutorService priceUpdater = Executors.newScheduledThreadPool(1);
private final Random random = new Random();
private final Map<String, Stock> stocks = new ConcurrentHashMap<>();
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
initializeStocks();
startPriceUpdates();
}
/**
* SSE endpoint for real-time stock prices
*/
@GET
@Path("/prices")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getStockPrices(@Context SseEventSink eventSink,
@QueryParam("symbols") String symbolFilter) {
Set<String> symbols = parseSymbols(symbolFilter);
// Send current prices immediately
if (symbols.isEmpty()) {
// Send all stocks
stocks.values().forEach(stock ->
sendStockEvent(eventSink, stock));
} else {
// Send filtered stocks
symbols.forEach(symbol -> {
Stock stock = stocks.get(symbol);
if (stock != null) {
sendStockEvent(eventSink, stock);
}
});
}
// Register for updates
broadcaster.register(eventSink);
eventSink.onClose(() -> {
System.out.println("Stock client disconnected");
});
}
/**
* Get historical data for a stock (non-SSE)
*/
@GET
@Path("/history/{symbol}")
@Produces(MediaType.APPLICATION_JSON)
public List<Stock> getStockHistory(@PathParam("symbol") String symbol,
@QueryParam("limit") @DefaultValue("50") int limit) {
Stock stock = stocks.get(symbol);
if (stock != null) {
return stock.getHistory(limit);
}
return Collections.emptyList();
}
private void initializeStocks() {
stocks.put("AAPL", new Stock("AAPL", "Apple Inc.", 150.0));
stocks.put("GOOGL", new Stock("GOOGL", "Alphabet Inc.", 2800.0));
stocks.put("MSFT", new Stock("MSFT", "Microsoft Corp.", 300.0));
stocks.put("AMZN", new Stock("AMZN", "Amazon.com Inc.", 3400.0));
stocks.put("TSLA", new Stock("TSLA", "Tesla Inc.", 1000.0));
}
private void startPriceUpdates() {
priceUpdater.scheduleAtFixedRate(() -> {
updateStockPrices();
broadcastPriceUpdates();
}, 1, 2, TimeUnit.SECONDS); // Update every 2 seconds
}
private void updateStockPrices() {
stocks.values().forEach(stock -> {
double changePercent = (random.nextDouble() - 0.5) * 2.0; // -1% to +1%
double newPrice = stock.getCurrentPrice() * (1 + changePercent / 100.0);
stock.updatePrice(newPrice);
});
}
private void broadcastPriceUpdates() {
stocks.values().forEach(stock -> {
broadcaster.broadcast(sse.newEventBuilder()
.name("price-update")
.data(stock, Stock.class)
.id(stock.getSymbol() + "-" + System.currentTimeMillis())
.build());
});
}
private void sendStockEvent(SseEventSink eventSink, Stock stock) {
eventSink.send(sse.newEventBuilder()
.name("stock-data")
.data(stock, Stock.class)
.id(stock.getSymbol())
.build());
}
private Set<String> parseSymbols(String symbolFilter) {
if (symbolFilter == null || symbolFilter.trim().isEmpty()) {
return Collections.emptySet();
}
return Set.of(symbolFilter.split(","));
}
@PreDestroy
public void cleanup() {
priceUpdater.shutdown();
if (broadcaster != null) {
broadcaster.close();
}
}
/**
* Stock data model
*/
public static class Stock {
private final String symbol;
private final String name;
private double currentPrice;
private final List<PriceHistory> history = new CopyOnWriteArrayList<>();
public Stock(String symbol, String name, double initialPrice) {
this.symbol = symbol;
this.name = name;
this.currentPrice = initialPrice;
this.history.add(new PriceHistory(initialPrice, System.currentTimeMillis()));
}
public synchronized void updatePrice(double newPrice) {
this.currentPrice = newPrice;
this.history.add(new PriceHistory(newPrice, System.currentTimeMillis()));
// Keep only last 1000 price points
if (history.size() > 1000) {
history.remove(0);
}
}
public List<Stock> getHistory(int limit) {
int start = Math.max(0, history.size() - limit);
return history.subList(start, history.size()).stream()
.map(ph -> new Stock(symbol, name, ph.price))
.toList();
}
// Getters
public String getSymbol() { return symbol; }
public String getName() { return name; }
public double getCurrentPrice() { return currentPrice; }
public List<PriceHistory> getFullHistory() { return new ArrayList<>(history); }
@Override
public String toString() {
return String.format("Stock{symbol='%s', name='%s', price=%.2f}",
symbol, name, currentPrice);
}
private static class PriceHistory {
final double price;
final long timestamp;
PriceHistory(double price, long timestamp) {
this.price = price;
this.timestamp = timestamp;
}
}
}
}
Live Sports Scores
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.*;
import java.util.concurrent.*;
@Path("/sports")
public class LiveSportsSseResource {
private Sse sse;
private final Map<String, SseBroadcaster> gameBroadcasters = new ConcurrentHashMap<>();
private final Map<String, Game> liveGames = new ConcurrentHashMap<>();
private final ScheduledExecutorService gameSimulator = Executors.newScheduledThreadPool(1);
private final Random random = new Random();
@Context
public void setSse(Sse sse) {
this.sse = sse;
initializeSampleGames();
startGameSimulation();
}
/**
* Subscribe to live updates for a specific game
*/
@GET
@Path("/game/{gameId}/live")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getGameUpdates(@Context SseEventSink eventSink,
@PathParam("gameId") String gameId) {
Game game = liveGames.get(gameId);
if (game == null) {
eventSink.send(sse.newEvent("error", "Game not found: " + gameId));
eventSink.close();
return;
}
SseBroadcaster broadcaster = gameBroadcasters.computeIfAbsent(gameId,
id -> sse.newBroadcaster());
// Send current game state
eventSink.send(sse.newEventBuilder()
.name("game-state")
.data(game)
.id(gameId + "-state")
.build());
// Send recent events
game.getRecentEvents().forEach(event ->
eventSink.send(sse.newEventBuilder()
.name("game-event")
.data(event)
.id(gameId + "-event-" + event.getTimestamp())
.build())
);
// Register for future updates
broadcaster.register(eventSink);
eventSink.onClose(() -> {
System.out.println("Client disconnected from game: " + gameId);
});
System.out.println("Client subscribed to game: " + gameId);
}
/**
* Get list of live games
*/
@GET
@Path("/games")
@Produces(MediaType.APPLICATION_JSON)
public Collection<Game> getLiveGames() {
return liveGames.values();
}
private void initializeSampleGames() {
liveGames.put("NBA-001", new Game("NBA-001", "Lakers vs Warriors", "Basketball", "Q2 08:30"));
liveGames.put("NBA-002", new Game("NBA-002", "Celtics vs Nets", "Basketball", "Q1 10:15"));
liveGames.put("NFL-001", new Game("NFL-001", "Patriots vs Chiefs", "Football", "Q3 05:20"));
liveGames.put("MLB-001", new Game("MLB-001", "Yankees vs Red Sox", "Baseball", "7th Inning"));
}
private void startGameSimulation() {
gameSimulator.scheduleAtFixedRate(() -> {
// Randomly update games to simulate live action
liveGames.values().forEach(this::simulateGameAction);
}, 2, 5, TimeUnit.SECONDS);
}
private void simulateGameAction(Game game) {
String[] actions = {"score", "foul", "timeout", "quarter_end", "injury"};
String action = actions[random.nextInt(actions.length)];
GameEvent event = game.addEvent(action, generateEventDescription(game, action));
// Broadcast to all subscribers
SseBroadcaster broadcaster = gameBroadcasters.get(game.getId());
if (broadcaster != null) {
broadcaster.broadcast(sse.newEventBuilder()
.name("game-update")
.data(event)
.id(game.getId() + "-" + System.currentTimeMillis())
.build());
}
System.out.println("Game " + game.getId() + " - " + event.getDescription());
}
private String generateEventDescription(Game game, String action) {
switch (action) {
case "score":
return random.nextBoolean() ? "3-point shot made!" : "Layup scored!";
case "foul":
return "Personal foul called";
case "timeout":
return "Team timeout called";
case "quarter_end":
return "End of quarter";
case "injury":
return "Player injury timeout";
default:
return "Game action occurred";
}
}
@PreDestroy
public void cleanup() {
gameSimulator.shutdown();
}
/**
* Game data model
*/
public static class Game {
private final String id;
private final String name;
private final String sport;
private String currentPeriod;
private final List<GameEvent> events = new CopyOnWriteArrayList<>();
public Game(String id, String name, String sport, String currentPeriod) {
this.id = id;
this.name = name;
this.sport = sport;
this.currentPeriod = currentPeriod;
}
public GameEvent addEvent(String type, String description) {
GameEvent event = new GameEvent(type, description, System.currentTimeMillis());
events.add(event);
// Keep only recent events
if (events.size() > 50) {
events.remove(0);
}
return event;
}
public List<GameEvent> getRecentEvents() {
int fromIndex = Math.max(0, events.size() - 10);
return events.subList(fromIndex, events.size());
}
// Getters
public String getId() { return id; }
public String getName() { return name; }
public String getSport() { return sport; }
public String getCurrentPeriod() { return currentPeriod; }
public void setCurrentPeriod(String period) { this.currentPeriod = period; }
public List<GameEvent> getEvents() { return new ArrayList<>(events); }
}
public static class GameEvent {
private final String type;
private final String description;
private final long timestamp;
public GameEvent(String type, String description, long timestamp) {
this.type = type;
this.description = description;
this.timestamp = timestamp;
}
// Getters
public String getType() { return type; }
public String getDescription() { return description; }
public long getTimestamp() { return timestamp; }
}
}
4. Client-Side Implementation
JavaScript SSE Client
<!DOCTYPE html>
<html>
<head>
<title>SSE Client Demo</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 800px; margin: 0 auto; }
.event-log { border: 1px solid #ccc; padding: 10px; height: 400px; overflow-y: auto; margin: 10px 0; }
.event { margin: 5px 0; padding: 5px; border-left: 3px solid #007bff; }
.event.error { border-left-color: #dc3545; background: #f8d7da; }
.event.success { border-left-color: #28a745; background: #d4edda; }
.controls { margin: 10px 0; }
button { padding: 8px 16px; margin: 5px; cursor: pointer; }
</style>
</head>
<body>
<div class="container">
<h1>Server-Sent Events Client</h1>
<div class="controls">
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>
<button onclick="clearLog()">Clear Log</button>
<button onclick="sendMessage()">Send Test Message</button>
</div>
<div>
<label>SSE Endpoint:</label>
<input type="text" id="endpoint" value="http://localhost:8080/api/events/stream" style="width: 400px;">
</div>
<div class="event-log" id="eventLog"></div>
<div>
<h3>Send Custom Message:</h3>
<input type="text" id="messageInput" placeholder="Enter message" style="width: 300px;">
<button onclick="sendCustomMessage()">Send</button>
</div>
</div>
<script>
let eventSource = null;
const eventLog = document.getElementById('eventLog');
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
let reconnectTimeout = null;
function connect() {
const endpoint = document.getElementById('endpoint').value;
if (eventSource) {
logEvent('Already connected to SSE', 'warning');
return;
}
try {
eventSource = new EventSource(endpoint);
eventSource.onopen = function(event) {
logEvent('Connected to SSE server', 'success');
reconnectAttempts = 0;
};
eventSource.onmessage = function(event) {
logEvent(`Message: ${event.data}`, 'info');
};
eventSource.addEventListener('welcome', function(event) {
logEvent(`Welcome: ${event.data}`, 'success');
});
eventSource.addEventListener('broadcast', function(event) {
logEvent(`Broadcast: ${event.data}`, 'info');
});
eventSource.addEventListener('error', function(event) {
if (eventSource.readyState === EventSource.CLOSED) {
logEvent('Connection closed by server', 'error');
attemptReconnect();
} else if (eventSource.readyState === EventSource.CONNECTING) {
logEvent('Reconnecting...', 'warning');
} else {
logEvent('SSE Error occurred', 'error');
}
});
eventSource.addEventListener('periodic-update', function(event) {
logEvent(`Periodic: ${event.data}`, 'info');
});
logEvent('Attempting to connect...', 'info');
} catch (error) {
logEvent(`Connection error: ${error.message}`, 'error');
}
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
logEvent('Disconnected from SSE server', 'warning');
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
} else {
logEvent('Not connected to any SSE server', 'warning');
}
}
function attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000); // Exponential backoff
logEvent(`Attempting reconnect in ${delay/1000} seconds (attempt ${reconnectAttempts}/${maxReconnectAttempts})`, 'warning');
reconnectTimeout = setTimeout(() => {
disconnect();
connect();
}, delay);
} else {
logEvent('Max reconnection attempts reached', 'error');
}
}
function sendMessage() {
// Send a message to the server (this would typically be a separate REST call)
fetch('http://localhost:8080/api/events/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: 'Test message from client at ' + new Date().toLocaleTimeString()
})
.then(response => response.text())
.then(result => {
logEvent(`Server response: ${result}`, 'success');
})
.catch(error => {
logEvent(`Failed to send message: ${error.message}`, 'error');
});
}
function sendCustomMessage() {
const messageInput = document.getElementById('messageInput');
const message = messageInput.value.trim();
if (!message) {
alert('Please enter a message');
return;
}
fetch('http://localhost:8080/api/events/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: message
})
.then(response => response.text())
.then(result => {
logEvent(`Message sent: ${message}`, 'success');
messageInput.value = '';
})
.catch(error => {
logEvent(`Failed to send message: ${error.message}`, 'error');
});
}
function clearLog() {
eventLog.innerHTML = '';
}
function logEvent(message, type = 'info') {
const eventElement = document.createElement('div');
eventElement.className = `event ${type}`;
const timestamp = new Date().toLocaleTimeString();
eventElement.innerHTML = `<strong>[${timestamp}]</strong> ${message}`;
eventLog.appendChild(eventElement);
eventLog.scrollTop = eventLog.scrollHeight;
}
// Auto-connect when page loads
window.addEventListener('load', function() {
// connect(); // Uncomment to auto-connect
});
// Cleanup on page unload
window.addEventListener('beforeunload', function() {
if (eventSource) {
eventSource.close();
}
});
</script>
</body>
</html>
Java SSE Client
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class JavaSseClient {
public static void main(String[] args) throws InterruptedException {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:8080/api/events/stream");
CountDownLatch latch = new CountDownLatch(1);
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(
// Event consumer
event -> {
String data = event.readData();
String name = event.getName();
String id = event.getId();
System.out.printf("Event [%s] - %s: %s%n",
name != null ? name : "message",
id != null ? "ID=" + id : "",
data);
// Close after receiving 10 events for demo
if ("complete".equals(name)) {
latch.countDown();
}
},
// Error consumer
throwable -> {
System.err.println("Error: " + throwable.getMessage());
latch.countDown();
},
// Completion consumer
() -> {
System.out.println("SSE connection completed");
latch.countDown();
}
);
System.out.println("Connecting to SSE server...");
eventSource.open();
// Wait for completion or timeout
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (!completed) {
System.out.println("Timeout reached, closing connection");
}
} finally {
client.close();
}
}
}
Summary
Key Benefits of Server-Sent Events:
- Simple Protocol: Works over standard HTTP, no special protocol needed
- Automatic Reconnection: Built-in reconnection mechanism with event IDs
- Efficient: Less overhead than WebSockets for server-to-client push
- Browser Support: Native support in modern browsers
- Firewall Friendly: Uses standard HTTP ports
Common Use Cases:
- Real-time notifications (chat, alerts, updates)
- Live sports scores and game updates
- Stock price tickers and financial data
- Live dashboards and monitoring
- News feeds and social media updates
- Progress updates for long-running operations
Best Practices:
- Always set timeouts and handle disconnections gracefully
- Use event IDs for reliable reconnection
- Implement proper error handling for both server and client
- Use appropriate reconnection strategies (exponential backoff)
- Consider authentication for sensitive data streams
- Monitor connection health and clean up resources
Limitations:
- Unidirectional only (server to client)
- Limited to 6 concurrent connections per browser
- No binary data support (text only)
- Less suitable for high-frequency bidirectional communication
Server-Sent Events provide an excellent solution for real-time server-to-client communication when you don't need the full duplex capability of WebSockets and want to leverage standard HTTP infrastructure.