Server-Sent Events (SSE) in Java

Introduction to Server-Sent Events

Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via HTTP connection. Unlike WebSockets, SSE is unidirectional (server to client only) and works over standard HTTP.


1. Basic SSE Implementation

JAX-RS SSE Endpoint

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Path("/events")
public class SseResource {
private Sse sse;
private SseBroadcaster broadcaster;
private final ConcurrentHashMap<String, SseEventSink> clientSinks = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
// Setup broadcaster error handling
this.broadcaster.onClose((sseEventSink) -> {
System.out.println("Client disconnected from broadcaster");
});
this.broadcaster.onError((sseEventSink, throwable) -> {
System.err.println("Broadcaster error: " + throwable.getMessage());
});
}
/**
* Basic SSE endpoint - clients connect here to receive events
*/
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink) {
String clientId = "client-" + System.currentTimeMillis();
clientSinks.put(clientId, eventSink);
// Send welcome message
eventSink.send(sse.newEventBuilder()
.name("welcome")
.data("Connected to SSE server. Client ID: " + clientId)
.build());
// Setup client disconnect handling
eventSink.onClose(() -> {
clientSinks.remove(clientId);
System.out.println("Client disconnected: " + clientId);
});
eventSink.onError((throwable) -> {
clientSinks.remove(clientId);
System.err.println("Client error: " + throwable.getMessage());
});
System.out.println("New client connected: " + clientId);
}
/**
* Send message to all connected clients via broadcaster
*/
@POST
@Path("/broadcast")
@Consumes(MediaType.TEXT_PLAIN)
public void broadcastMessage(String message) {
if (broadcaster != null) {
broadcaster.broadcast(sse.newEvent(message));
System.out.println("Broadcasted message: " + message);
}
}
/**
* Send structured event to all clients
*/
@POST
@Path("/event")
@Consumes(MediaType.APPLICATION_JSON)
public void sendStructuredEvent(EventData eventData) {
if (broadcaster != null) {
broadcaster.broadcast(sse.newEventBuilder()
.name(eventData.getType())
.data(eventData.getClass(), eventData)
.id(String.valueOf(System.currentTimeMillis()))
.build());
System.out.println("Sent structured event: " + eventData);
}
}
/**
* Start periodic updates (e.g., stock prices, metrics)
*/
@POST
@Path("/start-updates")
public String startPeriodicUpdates() {
scheduler.scheduleAtFixedRate(this::sendPeriodicUpdate, 0, 5, TimeUnit.SECONDS);
return "Periodic updates started";
}
private void sendPeriodicUpdate() {
if (broadcaster != null) {
String update = "Server time: " + System.currentTimeMillis();
broadcaster.broadcast(sse.newEventBuilder()
.name("periodic-update")
.data(update)
.id(String.valueOf(System.currentTimeMillis()))
.build());
}
}
@PreDestroy
public void cleanup() {
scheduler.shutdown();
if (broadcaster != null) {
broadcaster.close();
}
}
// Data class for structured events
public static class EventData {
private String type;
private String message;
private long timestamp;
// Constructors, getters, setters
public EventData() {}
public EventData(String type, String message) {
this.type = type;
this.message = message;
this.timestamp = System.currentTimeMillis();
}
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return String.format("EventData{type='%s', message='%s', timestamp=%d}", 
type, message, timestamp);
}
}
}

Spring WebFlux SSE Implementation

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/sse")
public class SpringSseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* SSE endpoint using Spring's SseEmitter
*/
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
SseEmitter emitter = new SseEmitter(60_000L); // 60 seconds timeout
String clientId = "client-" + System.currentTimeMillis();
emitters.put(clientId, emitter);
// Send initial connection event
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("connected")
.data("Client connected: " + clientId)
.id(clientId);
emitter.send(event);
} catch (IOException e) {
emitter.completeWithError(e);
}
// Handle client disconnect
emitter.onCompletion(() -> {
emitters.remove(clientId);
System.out.println("Client completed: " + clientId);
});
emitter.onTimeout(() -> {
emitters.remove(clientId);
System.out.println("Client timeout: " + clientId);
});
emitter.onError((ex) -> {
emitters.remove(clientId);
System.out.println("Client error: " + clientId + " - " + ex.getMessage());
});
System.out.println("New SSE client connected: " + clientId);
return emitter;
}
/**
* Reactive SSE endpoint using WebFlux
*/
@GetMapping(path = "/reactive-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> reactiveStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event #" + sequence + " - " + System.currentTimeMillis())
.doOnNext(event -> System.out.println("Sending: " + event));
}
/**
* Broadcast message to all connected clients
*/
@PostMapping("/broadcast")
public String broadcastMessage(@RequestBody String message) {
int successCount = 0;
int failureCount = 0;
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("broadcast")
.data(message)
.id(String.valueOf(System.currentTimeMillis()));
entry.getValue().send(event);
successCount++;
} catch (IOException e) {
failureCount++;
System.err.println("Failed to send to client " + entry.getKey() + ": " + e.getMessage());
emitters.remove(entry.getKey());
}
}
return String.format("Broadcast completed. Success: %d, Failed: %d", successCount, failureCount);
}
/**
* Send event to specific client
*/
@PostMapping("/send/{clientId}")
public String sendToClient(@PathVariable String clientId, @RequestBody String message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("direct-message")
.data(message)
.id(String.valueOf(System.currentTimeMillis()));
emitter.send(event);
return "Message sent to client: " + clientId;
} catch (IOException e) {
emitters.remove(clientId);
return "Failed to send to client (disconnected): " + clientId;
}
}
return "Client not found: " + clientId;
}
/**
* Start periodic server-generated events
*/
@PostMapping("/start-server-events")
public String startServerEvents() {
scheduler.scheduleAtFixedRate(() -> {
String event = "Server event at " + System.currentTimeMillis();
broadcastMessage(event);
}, 0, 3, TimeUnit.SECONDS);
return "Server events started";
}
@PreDestroy
public void cleanup() {
scheduler.shutdown();
// Complete all emitters
for (SseEmitter emitter : emitters.values()) {
emitter.complete();
}
emitters.clear();
}
}

2. Advanced SSE Features

Event Reconnection and IDs

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.core.Context;
import java.util.concurrent.atomic.AtomicLong;
@Path("/advanced-events")
public class AdvancedSseResource {
private Sse sse;
private final AtomicLong eventId = new AtomicLong(0);
@Context
public void setSse(Sse sse) {
this.sse = sse;
}
/**
* SSE with proper event IDs for client reconnection
*/
@GET
@Path("/reliable-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getReliableStream(@Context SseEventSink eventSink,
@QueryParam("lastEventId") @DefaultValue("0") long lastEventId) {
System.out.println("Client connected. Last event ID: " + lastEventId);
// Send missed events if client reconnected
if (lastEventId > 0) {
long currentId = eventId.get();
for (long i = lastEventId + 1; i <= currentId; i++) {
eventSink.send(createEvent("catchup", "Missed event: " + i, i));
}
}
// Start sending new events
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
long id = eventId.incrementAndGet();
eventSink.send(createEvent("message", "Event " + id, id));
Thread.sleep(2000);
}
// Send completion event
eventSink.send(sse.newEventBuilder()
.name("complete")
.data("Stream completed")
.id(String.valueOf(eventId.incrementAndGet()))
.build());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.println("Error sending events: " + e.getMessage());
}
}).start();
eventSink.onClose(() -> {
System.out.println("Reliable stream client disconnected");
});
}
/**
* SSE with retry configuration
*/
@GET
@Path("/configurable-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getConfigurableStream(@Context SseEventSink eventSink,
@QueryParam("retry") @DefaultValue("5000") long retryMs) {
// Send retry configuration to client
eventSink.send(sse.newEventBuilder()
.comment("retry: " + retryMs)
.data("Stream configured with retry: " + retryMs + "ms")
.build());
// Send events with potential errors to test reconnection
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
eventSink.send(sse.newEventBuilder()
.name("data")
.data("Message " + i)
.id(String.valueOf(i))
.build());
// Simulate server error on 3rd message to test reconnection
if (i == 3) {
Thread.sleep(1000);
throw new RuntimeException("Simulated server error");
}
Thread.sleep(2000);
}
} catch (Exception e) {
System.err.println("Stream interrupted: " + e.getMessage());
}
}).start();
}
private javax.ws.rs.sse.OutboundSseEvent createEvent(String name, String data, long id) {
return sse.newEventBuilder()
.name(name)
.data(data)
.id(String.valueOf(id))
.build();
}
/**
* Server-side event source (consuming external SSE)
*/
@GET
@Path("/proxy")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void proxyExternalEvents(@Context SseEventSink eventSink) {
// This would typically connect to an external SSE source
javax.ws.rs.client.Client client = javax.ws.rs.client.ClientBuilder.newClient();
SseEventSource eventSource = SseEventSource.target(
client.target("http://external-service.com/events")
).build();
eventSource.register(
event -> {
// Forward external events to our client
eventSink.send(sse.newEventBuilder()
.name("proxy")
.data(event.readData())
.id(event.getId())
.build());
},
throwable -> {
System.err.println("Error in proxy: " + throwable.getMessage());
eventSink.send(sse.newEvent("error", "Proxy error: " + throwable.getMessage()));
},
() -> {
System.out.println("Proxy connection closed");
eventSink.send(sse.newEvent("complete", "Proxy stream ended"));
}
);
eventSource.open();
eventSink.onClose(() -> {
eventSource.close();
client.close();
});
}
}

SSE with Authentication and Filtering

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Context;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Path("/secure-events")
public class SecureSseResource {
private Sse sse;
private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
private final Map<String, SseBroadcaster> roomBroadcasters = new ConcurrentHashMap<>();
@Context
public void setSse(Sse sse) {
this.sse = sse;
}
/**
* SSE endpoint with authentication
*/
@GET
@Path("/user-stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getUserStream(@Context SseEventSink eventSink,
@HeaderParam("Authorization") String authHeader) {
// Simple authentication (in real app, use proper auth)
String userId = authenticateUser(authHeader);
if (userId == null) {
eventSink.send(sse.newEvent("error", "Authentication failed"));
eventSink.close();
return;
}
UserSession session = new UserSession(userId, eventSink);
userSessions.put(userId, session);
// Send personalized welcome
eventSink.send(sse.newEventBuilder()
.name("welcome")
.data("Welcome, user " + userId)
.build());
eventSink.onClose(() -> {
userSessions.remove(userId);
System.out.println("User disconnected: " + userId);
});
eventSink.onError((throwable) -> {
userSessions.remove(userId);
System.err.println("User error: " + userId + " - " + throwable.getMessage());
});
System.out.println("User connected: " + userId);
}
/**
* Join a room/channel for group messaging
*/
@POST
@Path("/join-room/{roomId}")
public String joinRoom(@HeaderParam("Authorization") String authHeader,
@PathParam("roomId") String roomId) {
String userId = authenticateUser(authHeader);
if (userId == null) {
return "Authentication failed";
}
UserSession session = userSessions.get(userId);
if (session == null) {
return "User not connected via SSE";
}
SseBroadcaster roomBroadcaster = roomBroadcasters.computeIfAbsent(roomId, 
id -> sse.newBroadcaster());
roomBroadcaster.register(session.getEventSink());
session.joinRoom(roomId);
// Notify room
roomBroadcaster.broadcast(sse.newEventBuilder()
.name("user-joined")
.data("User " + userId + " joined room " + roomId)
.build());
return "Joined room: " + roomId;
}
/**
* Send message to a specific room
*/
@POST
@Path("/room/{roomId}/message")
@Consumes(MediaType.TEXT_PLAIN)
public String sendRoomMessage(@HeaderParam("Authorization") String authHeader,
@PathParam("roomId") String roomId,
String message) {
String userId = authenticateUser(authHeader);
if (userId == null) {
return "Authentication failed";
}
SseBroadcaster roomBroadcaster = roomBroadcasters.get(roomId);
if (roomBroadcaster != null) {
roomBroadcaster.broadcast(sse.newEventBuilder()
.name("room-message")
.data(userId + ": " + message)
.build());
return "Message sent to room: " + roomId;
}
return "Room not found: " + roomId;
}
/**
* Send private message to specific user
*/
@POST
@Path("/private-message/{targetUserId}")
@Consumes(MediaType.TEXT_PLAIN)
public String sendPrivateMessage(@HeaderParam("Authorization") String authHeader,
@PathParam("targetUserId") String targetUserId,
String message) {
String senderId = authenticateUser(authHeader);
if (senderId == null) {
return "Authentication failed";
}
UserSession targetSession = userSessions.get(targetUserId);
if (targetSession != null) {
try {
targetSession.getEventSink().send(sse.newEventBuilder()
.name("private-message")
.data("From " + senderId + ": " + message)
.build());
return "Private message sent to: " + targetUserId;
} catch (Exception e) {
userSessions.remove(targetUserId);
return "User disconnected: " + targetUserId;
}
}
return "User not connected: " + targetUserId;
}
private String authenticateUser(String authHeader) {
// Simplified authentication - in real app, use JWT or other auth mechanism
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7); // Return token as user ID for demo
}
return null;
}
/**
* User session tracking
*/
private static class UserSession {
private final String userId;
private final SseEventSink eventSink;
private final Set<String> joinedRooms = ConcurrentHashMap.newKeySet();
public UserSession(String userId, SseEventSink eventSink) {
this.userId = userId;
this.eventSink = eventSink;
}
public String getUserId() { return userId; }
public SseEventSink getEventSink() { return eventSink; }
public void joinRoom(String roomId) {
joinedRooms.add(roomId);
}
public void leaveRoom(String roomId) {
joinedRooms.remove(roomId);
}
public Set<String> getJoinedRooms() {
return Collections.unmodifiableSet(joinedRooms);
}
}
}

3. Real-World Use Cases

Stock Price Updates

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.*;
import java.util.concurrent.*;
@Path("/stocks")
public class StockPriceSseResource {
private Sse sse;
private SseBroadcaster broadcaster;
private final ScheduledExecutorService priceUpdater = Executors.newScheduledThreadPool(1);
private final Random random = new Random();
private final Map<String, Stock> stocks = new ConcurrentHashMap<>();
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
initializeStocks();
startPriceUpdates();
}
/**
* SSE endpoint for real-time stock prices
*/
@GET
@Path("/prices")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getStockPrices(@Context SseEventSink eventSink,
@QueryParam("symbols") String symbolFilter) {
Set<String> symbols = parseSymbols(symbolFilter);
// Send current prices immediately
if (symbols.isEmpty()) {
// Send all stocks
stocks.values().forEach(stock -> 
sendStockEvent(eventSink, stock));
} else {
// Send filtered stocks
symbols.forEach(symbol -> {
Stock stock = stocks.get(symbol);
if (stock != null) {
sendStockEvent(eventSink, stock);
}
});
}
// Register for updates
broadcaster.register(eventSink);
eventSink.onClose(() -> {
System.out.println("Stock client disconnected");
});
}
/**
* Get historical data for a stock (non-SSE)
*/
@GET
@Path("/history/{symbol}")
@Produces(MediaType.APPLICATION_JSON)
public List<Stock> getStockHistory(@PathParam("symbol") String symbol,
@QueryParam("limit") @DefaultValue("50") int limit) {
Stock stock = stocks.get(symbol);
if (stock != null) {
return stock.getHistory(limit);
}
return Collections.emptyList();
}
private void initializeStocks() {
stocks.put("AAPL", new Stock("AAPL", "Apple Inc.", 150.0));
stocks.put("GOOGL", new Stock("GOOGL", "Alphabet Inc.", 2800.0));
stocks.put("MSFT", new Stock("MSFT", "Microsoft Corp.", 300.0));
stocks.put("AMZN", new Stock("AMZN", "Amazon.com Inc.", 3400.0));
stocks.put("TSLA", new Stock("TSLA", "Tesla Inc.", 1000.0));
}
private void startPriceUpdates() {
priceUpdater.scheduleAtFixedRate(() -> {
updateStockPrices();
broadcastPriceUpdates();
}, 1, 2, TimeUnit.SECONDS); // Update every 2 seconds
}
private void updateStockPrices() {
stocks.values().forEach(stock -> {
double changePercent = (random.nextDouble() - 0.5) * 2.0; // -1% to +1%
double newPrice = stock.getCurrentPrice() * (1 + changePercent / 100.0);
stock.updatePrice(newPrice);
});
}
private void broadcastPriceUpdates() {
stocks.values().forEach(stock -> {
broadcaster.broadcast(sse.newEventBuilder()
.name("price-update")
.data(stock, Stock.class)
.id(stock.getSymbol() + "-" + System.currentTimeMillis())
.build());
});
}
private void sendStockEvent(SseEventSink eventSink, Stock stock) {
eventSink.send(sse.newEventBuilder()
.name("stock-data")
.data(stock, Stock.class)
.id(stock.getSymbol())
.build());
}
private Set<String> parseSymbols(String symbolFilter) {
if (symbolFilter == null || symbolFilter.trim().isEmpty()) {
return Collections.emptySet();
}
return Set.of(symbolFilter.split(","));
}
@PreDestroy
public void cleanup() {
priceUpdater.shutdown();
if (broadcaster != null) {
broadcaster.close();
}
}
/**
* Stock data model
*/
public static class Stock {
private final String symbol;
private final String name;
private double currentPrice;
private final List<PriceHistory> history = new CopyOnWriteArrayList<>();
public Stock(String symbol, String name, double initialPrice) {
this.symbol = symbol;
this.name = name;
this.currentPrice = initialPrice;
this.history.add(new PriceHistory(initialPrice, System.currentTimeMillis()));
}
public synchronized void updatePrice(double newPrice) {
this.currentPrice = newPrice;
this.history.add(new PriceHistory(newPrice, System.currentTimeMillis()));
// Keep only last 1000 price points
if (history.size() > 1000) {
history.remove(0);
}
}
public List<Stock> getHistory(int limit) {
int start = Math.max(0, history.size() - limit);
return history.subList(start, history.size()).stream()
.map(ph -> new Stock(symbol, name, ph.price))
.toList();
}
// Getters
public String getSymbol() { return symbol; }
public String getName() { return name; }
public double getCurrentPrice() { return currentPrice; }
public List<PriceHistory> getFullHistory() { return new ArrayList<>(history); }
@Override
public String toString() {
return String.format("Stock{symbol='%s', name='%s', price=%.2f}", 
symbol, name, currentPrice);
}
private static class PriceHistory {
final double price;
final long timestamp;
PriceHistory(double price, long timestamp) {
this.price = price;
this.timestamp = timestamp;
}
}
}
}

Live Sports Scores

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.core.Context;
import java.util.*;
import java.util.concurrent.*;
@Path("/sports")
public class LiveSportsSseResource {
private Sse sse;
private final Map<String, SseBroadcaster> gameBroadcasters = new ConcurrentHashMap<>();
private final Map<String, Game> liveGames = new ConcurrentHashMap<>();
private final ScheduledExecutorService gameSimulator = Executors.newScheduledThreadPool(1);
private final Random random = new Random();
@Context
public void setSse(Sse sse) {
this.sse = sse;
initializeSampleGames();
startGameSimulation();
}
/**
* Subscribe to live updates for a specific game
*/
@GET
@Path("/game/{gameId}/live")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getGameUpdates(@Context SseEventSink eventSink,
@PathParam("gameId") String gameId) {
Game game = liveGames.get(gameId);
if (game == null) {
eventSink.send(sse.newEvent("error", "Game not found: " + gameId));
eventSink.close();
return;
}
SseBroadcaster broadcaster = gameBroadcasters.computeIfAbsent(gameId, 
id -> sse.newBroadcaster());
// Send current game state
eventSink.send(sse.newEventBuilder()
.name("game-state")
.data(game)
.id(gameId + "-state")
.build());
// Send recent events
game.getRecentEvents().forEach(event ->
eventSink.send(sse.newEventBuilder()
.name("game-event")
.data(event)
.id(gameId + "-event-" + event.getTimestamp())
.build())
);
// Register for future updates
broadcaster.register(eventSink);
eventSink.onClose(() -> {
System.out.println("Client disconnected from game: " + gameId);
});
System.out.println("Client subscribed to game: " + gameId);
}
/**
* Get list of live games
*/
@GET
@Path("/games")
@Produces(MediaType.APPLICATION_JSON)
public Collection<Game> getLiveGames() {
return liveGames.values();
}
private void initializeSampleGames() {
liveGames.put("NBA-001", new Game("NBA-001", "Lakers vs Warriors", "Basketball", "Q2 08:30"));
liveGames.put("NBA-002", new Game("NBA-002", "Celtics vs Nets", "Basketball", "Q1 10:15"));
liveGames.put("NFL-001", new Game("NFL-001", "Patriots vs Chiefs", "Football", "Q3 05:20"));
liveGames.put("MLB-001", new Game("MLB-001", "Yankees vs Red Sox", "Baseball", "7th Inning"));
}
private void startGameSimulation() {
gameSimulator.scheduleAtFixedRate(() -> {
// Randomly update games to simulate live action
liveGames.values().forEach(this::simulateGameAction);
}, 2, 5, TimeUnit.SECONDS);
}
private void simulateGameAction(Game game) {
String[] actions = {"score", "foul", "timeout", "quarter_end", "injury"};
String action = actions[random.nextInt(actions.length)];
GameEvent event = game.addEvent(action, generateEventDescription(game, action));
// Broadcast to all subscribers
SseBroadcaster broadcaster = gameBroadcasters.get(game.getId());
if (broadcaster != null) {
broadcaster.broadcast(sse.newEventBuilder()
.name("game-update")
.data(event)
.id(game.getId() + "-" + System.currentTimeMillis())
.build());
}
System.out.println("Game " + game.getId() + " - " + event.getDescription());
}
private String generateEventDescription(Game game, String action) {
switch (action) {
case "score":
return random.nextBoolean() ? "3-point shot made!" : "Layup scored!";
case "foul":
return "Personal foul called";
case "timeout":
return "Team timeout called";
case "quarter_end":
return "End of quarter";
case "injury":
return "Player injury timeout";
default:
return "Game action occurred";
}
}
@PreDestroy
public void cleanup() {
gameSimulator.shutdown();
}
/**
* Game data model
*/
public static class Game {
private final String id;
private final String name;
private final String sport;
private String currentPeriod;
private final List<GameEvent> events = new CopyOnWriteArrayList<>();
public Game(String id, String name, String sport, String currentPeriod) {
this.id = id;
this.name = name;
this.sport = sport;
this.currentPeriod = currentPeriod;
}
public GameEvent addEvent(String type, String description) {
GameEvent event = new GameEvent(type, description, System.currentTimeMillis());
events.add(event);
// Keep only recent events
if (events.size() > 50) {
events.remove(0);
}
return event;
}
public List<GameEvent> getRecentEvents() {
int fromIndex = Math.max(0, events.size() - 10);
return events.subList(fromIndex, events.size());
}
// Getters
public String getId() { return id; }
public String getName() { return name; }
public String getSport() { return sport; }
public String getCurrentPeriod() { return currentPeriod; }
public void setCurrentPeriod(String period) { this.currentPeriod = period; }
public List<GameEvent> getEvents() { return new ArrayList<>(events); }
}
public static class GameEvent {
private final String type;
private final String description;
private final long timestamp;
public GameEvent(String type, String description, long timestamp) {
this.type = type;
this.description = description;
this.timestamp = timestamp;
}
// Getters
public String getType() { return type; }
public String getDescription() { return description; }
public long getTimestamp() { return timestamp; }
}
}

4. Client-Side Implementation

JavaScript SSE Client

<!DOCTYPE html>
<html>
<head>
<title>SSE Client Demo</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 800px; margin: 0 auto; }
.event-log { border: 1px solid #ccc; padding: 10px; height: 400px; overflow-y: auto; margin: 10px 0; }
.event { margin: 5px 0; padding: 5px; border-left: 3px solid #007bff; }
.event.error { border-left-color: #dc3545; background: #f8d7da; }
.event.success { border-left-color: #28a745; background: #d4edda; }
.controls { margin: 10px 0; }
button { padding: 8px 16px; margin: 5px; cursor: pointer; }
</style>
</head>
<body>
<div class="container">
<h1>Server-Sent Events Client</h1>
<div class="controls">
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>
<button onclick="clearLog()">Clear Log</button>
<button onclick="sendMessage()">Send Test Message</button>
</div>
<div>
<label>SSE Endpoint:</label>
<input type="text" id="endpoint" value="http://localhost:8080/api/events/stream" style="width: 400px;">
</div>
<div class="event-log" id="eventLog"></div>
<div>
<h3>Send Custom Message:</h3>
<input type="text" id="messageInput" placeholder="Enter message" style="width: 300px;">
<button onclick="sendCustomMessage()">Send</button>
</div>
</div>
<script>
let eventSource = null;
const eventLog = document.getElementById('eventLog');
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
let reconnectTimeout = null;
function connect() {
const endpoint = document.getElementById('endpoint').value;
if (eventSource) {
logEvent('Already connected to SSE', 'warning');
return;
}
try {
eventSource = new EventSource(endpoint);
eventSource.onopen = function(event) {
logEvent('Connected to SSE server', 'success');
reconnectAttempts = 0;
};
eventSource.onmessage = function(event) {
logEvent(`Message: ${event.data}`, 'info');
};
eventSource.addEventListener('welcome', function(event) {
logEvent(`Welcome: ${event.data}`, 'success');
});
eventSource.addEventListener('broadcast', function(event) {
logEvent(`Broadcast: ${event.data}`, 'info');
});
eventSource.addEventListener('error', function(event) {
if (eventSource.readyState === EventSource.CLOSED) {
logEvent('Connection closed by server', 'error');
attemptReconnect();
} else if (eventSource.readyState === EventSource.CONNECTING) {
logEvent('Reconnecting...', 'warning');
} else {
logEvent('SSE Error occurred', 'error');
}
});
eventSource.addEventListener('periodic-update', function(event) {
logEvent(`Periodic: ${event.data}`, 'info');
});
logEvent('Attempting to connect...', 'info');
} catch (error) {
logEvent(`Connection error: ${error.message}`, 'error');
}
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
logEvent('Disconnected from SSE server', 'warning');
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
} else {
logEvent('Not connected to any SSE server', 'warning');
}
}
function attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000); // Exponential backoff
logEvent(`Attempting reconnect in ${delay/1000} seconds (attempt ${reconnectAttempts}/${maxReconnectAttempts})`, 'warning');
reconnectTimeout = setTimeout(() => {
disconnect();
connect();
}, delay);
} else {
logEvent('Max reconnection attempts reached', 'error');
}
}
function sendMessage() {
// Send a message to the server (this would typically be a separate REST call)
fetch('http://localhost:8080/api/events/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: 'Test message from client at ' + new Date().toLocaleTimeString()
})
.then(response => response.text())
.then(result => {
logEvent(`Server response: ${result}`, 'success');
})
.catch(error => {
logEvent(`Failed to send message: ${error.message}`, 'error');
});
}
function sendCustomMessage() {
const messageInput = document.getElementById('messageInput');
const message = messageInput.value.trim();
if (!message) {
alert('Please enter a message');
return;
}
fetch('http://localhost:8080/api/events/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: message
})
.then(response => response.text())
.then(result => {
logEvent(`Message sent: ${message}`, 'success');
messageInput.value = '';
})
.catch(error => {
logEvent(`Failed to send message: ${error.message}`, 'error');
});
}
function clearLog() {
eventLog.innerHTML = '';
}
function logEvent(message, type = 'info') {
const eventElement = document.createElement('div');
eventElement.className = `event ${type}`;
const timestamp = new Date().toLocaleTimeString();
eventElement.innerHTML = `<strong>[${timestamp}]</strong> ${message}`;
eventLog.appendChild(eventElement);
eventLog.scrollTop = eventLog.scrollHeight;
}
// Auto-connect when page loads
window.addEventListener('load', function() {
// connect(); // Uncomment to auto-connect
});
// Cleanup on page unload
window.addEventListener('beforeunload', function() {
if (eventSource) {
eventSource.close();
}
});
</script>
</body>
</html>

Java SSE Client

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class JavaSseClient {
public static void main(String[] args) throws InterruptedException {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:8080/api/events/stream");
CountDownLatch latch = new CountDownLatch(1);
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(
// Event consumer
event -> {
String data = event.readData();
String name = event.getName();
String id = event.getId();
System.out.printf("Event [%s] - %s: %s%n", 
name != null ? name : "message", 
id != null ? "ID=" + id : "", 
data);
// Close after receiving 10 events for demo
if ("complete".equals(name)) {
latch.countDown();
}
},
// Error consumer
throwable -> {
System.err.println("Error: " + throwable.getMessage());
latch.countDown();
},
// Completion consumer
() -> {
System.out.println("SSE connection completed");
latch.countDown();
}
);
System.out.println("Connecting to SSE server...");
eventSource.open();
// Wait for completion or timeout
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (!completed) {
System.out.println("Timeout reached, closing connection");
}
} finally {
client.close();
}
}
}

Summary

Key Benefits of Server-Sent Events:

  1. Simple Protocol: Works over standard HTTP, no special protocol needed
  2. Automatic Reconnection: Built-in reconnection mechanism with event IDs
  3. Efficient: Less overhead than WebSockets for server-to-client push
  4. Browser Support: Native support in modern browsers
  5. Firewall Friendly: Uses standard HTTP ports

Common Use Cases:

  • Real-time notifications (chat, alerts, updates)
  • Live sports scores and game updates
  • Stock price tickers and financial data
  • Live dashboards and monitoring
  • News feeds and social media updates
  • Progress updates for long-running operations

Best Practices:

  1. Always set timeouts and handle disconnections gracefully
  2. Use event IDs for reliable reconnection
  3. Implement proper error handling for both server and client
  4. Use appropriate reconnection strategies (exponential backoff)
  5. Consider authentication for sensitive data streams
  6. Monitor connection health and clean up resources

Limitations:

  • Unidirectional only (server to client)
  • Limited to 6 concurrent connections per browser
  • No binary data support (text only)
  • Less suitable for high-frequency bidirectional communication

Server-Sent Events provide an excellent solution for real-time server-to-client communication when you don't need the full duplex capability of WebSockets and want to leverage standard HTTP infrastructure.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper