WebSocket Testing with Java: Comprehensive Guide and Implementation

WebSocket testing requires specialized approaches to handle bidirectional, real-time communication. This guide covers comprehensive testing strategies for WebSocket clients and servers in Java.

Project Setup and Dependencies

1. Maven Dependencies
<dependencies>
<!-- Spring Boot WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.1.0</version>
<scope>test</scope>
</dependency>
<!-- WebSocket Client -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Testing Utilities -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<!-- Mock WebSocket Server -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
</dependencies>

WebSocket Server Implementation

1. WebSocket Configuration
// WebSocketConfig.java
package com.company.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*");
}
}
// WebSocketSecurityConfig.java
package com.company.websocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;
import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.nullDestMatcher().authenticated()
.simpSubscribeDestMatchers("/user/queue/errors").permitAll()
.simpDestMatchers("/app/**").authenticated()
.simpSubscribeDestMatchers("/topic/**", "/user/**").authenticated()
.anyMessage().denyAll();
}
@Override
protected boolean sameOriginDisabled() {
return true; // For testing purposes
}
}
2. WebSocket Controllers
// ChatController.java
package com.company.websocket.controller;
import com.company.websocket.model.*;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Controller;
import java.security.Principal;
import java.time.LocalDateTime;
import java.util.concurrent.ConcurrentHashMap;
@Controller
public class ChatController {
private final SimpMessagingTemplate messagingTemplate;
private final ConcurrentHashMap<String, UserSession> activeSessions = new ConcurrentHashMap<>();
public ChatController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/chat.send")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage, 
Principal principal) {
chatMessage.setTimestamp(LocalDateTime.now());
chatMessage.setSender(principal.getName());
// Log message for testing
System.out.println("Message received: " + chatMessage.getContent() + " from " + chatMessage.getSender());
return chatMessage;
}
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
// Add username in web socket session
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
// Track active session
activeSessions.put(chatMessage.getSender(), 
new UserSession(chatMessage.getSender(), LocalDateTime.now()));
chatMessage.setContent(chatMessage.getSender() + " joined the chat!");
chatMessage.setTimestamp(LocalDateTime.now());
chatMessage.setType(ChatMessage.MessageType.JOIN);
// Notify about active users
messagingTemplate.convertAndSend("/topic/users", 
new UserListMessage(new ArrayList<>(activeSessions.keySet())));
return chatMessage;
}
@MessageMapping("/chat.private")
@SendToUser("/queue/private")
public PrivateMessage sendPrivateMessage(@Payload PrivateMessage privateMessage,
Principal principal) {
privateMessage.setTimestamp(LocalDateTime.now());
privateMessage.setFromUser(principal.getName());
// Send to specific user
messagingTemplate.convertAndSendToUser(
privateMessage.getToUser(),
"/queue/private",
privateMessage
);
return privateMessage;
}
@MessageMapping("/chat.typing")
public void typing(@Payload TypingNotification typingNotification,
Principal principal) {
typingNotification.setUsername(principal.getName());
messagingTemplate.convertAndSend("/topic/typing", typingNotification);
}
@MessageMapping("/system.ping")
@SendToUser("/queue/pong")
public PongMessage handlePing(@Payload PingMessage pingMessage,
Principal principal) {
return new PongMessage(pingMessage.getId(), "pong", LocalDateTime.now());
}
}
// NotificationController.java
package com.company.websocket.controller;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
@Controller
public class NotificationController {
private final SimpMessagingTemplate messagingTemplate;
private final AtomicLong notificationId = new AtomicLong(0);
public NotificationController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/notifications.subscribe")
public void subscribeToNotifications(Principal principal) {
// User subscribed to notifications
System.out.println("User " + principal.getName() + " subscribed to notifications");
}
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void sendSystemNotification() {
SystemNotification notification = new SystemNotification(
notificationId.incrementAndGet(),
"System Update",
"Server time: " + LocalDateTime.now(),
LocalDateTime.now()
);
messagingTemplate.convertAndSend("/topic/notifications", notification);
}
}
3. WebSocket Models
// ChatMessage.java
package com.company.websocket.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.LocalDateTime;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ChatMessage {
public enum MessageType {
CHAT, JOIN, LEAVE, ERROR
}
private MessageType type;
private String content;
private String sender;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timestamp;
private String sessionId;
// Constructors
public ChatMessage() {}
public ChatMessage(MessageType type, String content, String sender) {
this.type = type;
this.content = content;
this.sender = sender;
this.timestamp = LocalDateTime.now();
}
// Getters and setters
public MessageType getType() { return type; }
public void setType(MessageType type) { this.type = type; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public String getSender() { return sender; }
public void setSender(String sender) { this.sender = sender; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
public String getSessionId() { return sessionId; }
public void setSessionId(String sessionId) { this.sessionId = sessionId; }
}
// PrivateMessage.java
package com.company.websocket.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.LocalDateTime;
public class PrivateMessage {
private String fromUser;
private String toUser;
private String content;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timestamp;
// Constructors, getters, setters
public PrivateMessage() {}
public PrivateMessage(String fromUser, String toUser, String content) {
this.fromUser = fromUser;
this.toUser = toUser;
this.content = content;
this.timestamp = LocalDateTime.now();
}
public String getFromUser() { return fromUser; }
public void setFromUser(String fromUser) { this.fromUser = fromUser; }
public String getToUser() { return toUser; }
public void setToUser(String toUser) { this.toUser = toUser; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}
// SystemNotification.java
package com.company.websocket.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.LocalDateTime;
public class SystemNotification {
private Long id;
private String title;
private String message;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timestamp;
// Constructors, getters, setters
public SystemNotification() {}
public SystemNotification(Long id, String title, String message, LocalDateTime timestamp) {
this.id = id;
this.title = title;
this.message = message;
this.timestamp = timestamp;
}
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}
// UserSession.java
package com.company.websocket.model;
import java.time.LocalDateTime;
public class UserSession {
private String username;
private LocalDateTime connectedAt;
private LocalDateTime lastActivity;
public UserSession(String username, LocalDateTime connectedAt) {
this.username = username;
this.connectedAt = connectedAt;
this.lastActivity = connectedAt;
}
// Getters and setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public LocalDateTime getConnectedAt() { return connectedAt; }
public void setConnectedAt(LocalDateTime connectedAt) { this.connectedAt = connectedAt; }
public LocalDateTime getLastActivity() { return lastActivity; }
public void setLastActivity(LocalDateTime lastActivity) { this.lastActivity = lastActivity; }
}
// TypingNotification.java
package com.company.websocket.model;
public class TypingNotification {
private String username;
private boolean typing;
// Constructors, getters, setters
public TypingNotification() {}
public TypingNotification(String username, boolean typing) {
this.username = username;
this.typing = typing;
}
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public boolean isTyping() { return typing; }
public void setTyping(boolean typing) { this.typing = typing; }
}
// PingMessage.java
package com.company.websocket.model;
public class PingMessage {
private String id;
private Long timestamp;
public PingMessage() {}
public PingMessage(String id, Long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}
// PongMessage.java
package com.company.websocket.model;
import java.time.LocalDateTime;
public class PongMessage {
private String id;
private String response;
private LocalDateTime timestamp;
public PongMessage() {}
public PongMessage(String id, String response, LocalDateTime timestamp) {
this.id = id;
this.response = response;
this.timestamp = timestamp;
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getResponse() { return response; }
public void setResponse(String response) { this.response = response; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}
// UserListMessage.java
package com.company.websocket.model;
import java.util.List;
public class UserListMessage {
private List<String> users;
private int activeUsers;
public UserListMessage(List<String> users) {
this.users = users;
this.activeUsers = users.size();
}
public List<String> getUsers() { return users; }
public void setUsers(List<String> users) { this.users = users; }
public int getActiveUsers() { return activeUsers; }
public void setActiveUsers(int activeUsers) { this.activeUsers = activeUsers; }
}

WebSocket Testing Framework

1. Base WebSocket Test Configuration
// BaseWebSocketTest.java
package com.company.websocket.test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public abstract class BaseWebSocketTest {
@LocalServerPort
private int port;
private String getWebSocketUrl() {
return "ws://localhost:" + port + "/ws";
}
protected WebSocketStompClient createWebSocketClient() {
WebSocketStompClient stompClient = new WebSocketStompClient(
new SockJsClient(Collections.singletonList(new WebSocketTransport(new StandardWebSocketClient())))
);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
return stompClient;
}
protected CompletableFuture<StompSession> connectAsync(StompSessionHandler sessionHandler) {
WebSocketStompClient stompClient = createWebSocketClient();
return stompClient.connectAsync(getWebSocketUrl(), sessionHandler);
}
protected StompSession connect(StompSessionHandler sessionHandler, long timeoutSeconds) 
throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<StompSession> future = connectAsync(sessionHandler);
return future.get(timeoutSeconds, TimeUnit.SECONDS);
}
}
2. Test STOMP Session Handlers
// TestStompSessionHandler.java
package com.company.websocket.test;
import com.company.websocket.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TestStompSessionHandler implements StompSessionHandler {
private final ObjectMapper objectMapper = new ObjectMapper();
private final BlockingQueue<Object> receivedMessages = new LinkedBlockingQueue<>();
private final List<StompSession.Subscription> subscriptions = new ArrayList<>();
private StompSession session;
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
this.session = session;
System.out.println("STOMP session connected: " + session.getSessionId());
}
@Override
public void handleException(StompSession session, StompCommand command, 
StompHeaders headers, byte[] payload, Throwable exception) {
System.err.println("STOMP exception: " + exception.getMessage());
exception.printStackTrace();
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
System.err.println("STOMP transport error: " + exception.getMessage());
}
@Override
public Type getPayloadType(StompHeaders headers) {
// Determine payload type based on destination
String destination = headers.getDestination();
if (destination == null) {
return Object.class;
}
if (destination.contains("/topic/public")) {
return ChatMessage.class;
} else if (destination.contains("/queue/private")) {
return PrivateMessage.class;
} else if (destination.contains("/topic/notifications")) {
return SystemNotification.class;
} else if (destination.contains("/queue/pong")) {
return PongMessage.class;
}
return Object.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("Received message: " + payload);
receivedMessages.offer(payload);
}
public StompSession.Subscription subscribe(String destination) {
if (session == null) {
throw new IllegalStateException("Session not connected");
}
StompSession.Subscription subscription = session.subscribe(destination, this);
subscriptions.add(subscription);
return subscription;
}
public void send(String destination, Object payload) {
if (session == null) {
throw new IllegalStateException("Session not connected");
}
session.send(destination, payload);
}
@SuppressWarnings("unchecked")
public <T> T receiveMessage(Class<T> expectedType, long timeout, TimeUnit unit) 
throws InterruptedException {
Object message = receivedMessages.poll(timeout, unit);
if (message != null && expectedType.isInstance(message)) {
return (T) message;
}
return null;
}
@SuppressWarnings("unchecked")
public <T> List<T> receiveAllMessages(Class<T> expectedType, long timeout, TimeUnit unit) 
throws InterruptedException {
List<T> messages = new ArrayList<>();
long startTime = System.currentTimeMillis();
long timeoutMs = unit.toMillis(timeout);
while (System.currentTimeMillis() - startTime < timeoutMs) {
Object message = receivedMessages.poll(100, TimeUnit.MILLISECONDS);
if (message != null && expectedType.isInstance(message)) {
messages.add((T) message);
}
}
return messages;
}
public void disconnect() {
if (session != null) {
// Unsubscribe from all topics
for (StompSession.Subscription subscription : subscriptions) {
subscription.unsubscribe();
}
session.disconnect();
}
}
public void clearMessages() {
receivedMessages.clear();
}
public int getMessageCount() {
return receivedMessages.size();
}
public boolean isConnected() {
return session != null && session.isConnected();
}
}
// AsyncTestStompSessionHandler.java
package com.company.websocket.test;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CompletableFuture;
public class AsyncTestStompSessionHandler extends TestStompSessionHandler {
private final CompletableFuture<StompSession> connectionFuture = new CompletableFuture<>();
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
super.afterConnected(session, connectedHeaders);
connectionFuture.complete(session);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
super.handleTransportError(session, exception);
connectionFuture.completeExceptionally(exception);
}
public CompletableFuture<StompSession> getConnectionFuture() {
return connectionFuture;
}
public static CompletableFuture<StompSession> wrapListenableFuture(
ListenableFuture<StompSession> listenableFuture) {
CompletableFuture<StompSession> completableFuture = new CompletableFuture<>();
listenableFuture.addCallback(new ListenableFutureCallback<StompSession>() {
@Override
public void onSuccess(StompSession result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable ex) {
completableFuture.completeExceptionally(ex);
}
});
return completableFuture;
}
}
3. Comprehensive WebSocket Tests
// ChatWebSocketTest.java
package com.company.websocket.test;
import com.company.websocket.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.simp.stomp.StompSession;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
class ChatWebSocketTest extends BaseWebSocketTest {
private TestStompSessionHandler sessionHandler1;
private TestStompSessionHandler sessionHandler2;
private StompSession session1;
private StompSession session2;
@BeforeEach
void setUp() throws InterruptedException, ExecutionException, TimeoutException {
sessionHandler1 = new TestStompSessionHandler();
sessionHandler2 = new TestStompSessionHandler();
session1 = connect(sessionHandler1, 5);
session2 = connect(sessionHandler2, 5);
// Subscribe to public topics
sessionHandler1.subscribe("/topic/public");
sessionHandler1.subscribe("/topic/users");
sessionHandler2.subscribe("/topic/public");
sessionHandler2.subscribe("/topic/users");
}
@AfterEach
void tearDown() {
if (sessionHandler1 != null) {
sessionHandler1.disconnect();
}
if (sessionHandler2 != null) {
sessionHandler2.disconnect();
}
}
@Test
void testUserJoinAndBroadcast() throws InterruptedException {
// User 1 joins
ChatMessage joinMessage1 = new ChatMessage();
joinMessage1.setType(ChatMessage.MessageType.JOIN);
joinMessage1.setSender("user1");
sessionHandler1.send("/app/chat.addUser", joinMessage1);
// Both sessions should receive the join notification
ChatMessage receivedMessage1 = sessionHandler1.receiveMessage(
ChatMessage.class, 2, TimeUnit.SECONDS);
ChatMessage receivedMessage2 = sessionHandler2.receiveMessage(
ChatMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(receivedMessage1);
assertNotNull(receivedMessage2);
assertEquals(ChatMessage.MessageType.JOIN, receivedMessage1.getType());
assertEquals("user1 joined the chat!", receivedMessage1.getContent());
assertEquals("user1", receivedMessage1.getSender());
}
@Test
void testChatMessageBroadcast() throws InterruptedException {
// First, users join
ChatMessage joinMessage1 = new ChatMessage();
joinMessage1.setType(ChatMessage.MessageType.JOIN);
joinMessage1.setSender("user1");
sessionHandler1.send("/app/chat.addUser", joinMessage1);
ChatMessage joinMessage2 = new ChatMessage();
joinMessage2.setType(ChatMessage.MessageType.JOIN);
joinMessage2.setSender("user2");
sessionHandler2.send("/app/chat.addUser", joinMessage2);
// Clear join messages
sessionHandler1.clearMessages();
sessionHandler2.clearMessages();
// User1 sends a message
ChatMessage chatMessage = new ChatMessage();
chatMessage.setType(ChatMessage.MessageType.CHAT);
chatMessage.setSender("user1");
chatMessage.setContent("Hello, World!");
sessionHandler1.send("/app/chat.send", chatMessage);
// Both sessions should receive the message
ChatMessage received1 = sessionHandler1.receiveMessage(
ChatMessage.class, 2, TimeUnit.SECONDS);
ChatMessage received2 = sessionHandler2.receiveMessage(
ChatMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(received1);
assertNotNull(received2);
assertEquals("Hello, World!", received1.getContent());
assertEquals("user1", received1.getSender());
assertEquals(ChatMessage.MessageType.CHAT, received1.getType());
}
@Test
void testMultipleMessagesOrder() throws InterruptedException {
// User joins
ChatMessage joinMessage = new ChatMessage();
joinMessage.setType(ChatMessage.MessageType.JOIN);
joinMessage.setSender("testUser");
sessionHandler1.send("/app/chat.addUser", joinMessage);
sessionHandler1.clearMessages();
// Send multiple messages
for (int i = 1; i <= 5; i++) {
ChatMessage message = new ChatMessage();
message.setType(ChatMessage.MessageType.CHAT);
message.setSender("testUser");
message.setContent("Message " + i);
sessionHandler1.send("/app/chat.send", message);
}
// Receive all messages
List<ChatMessage> messages = sessionHandler1.receiveAllMessages(
ChatMessage.class, 3, TimeUnit.SECONDS);
assertEquals(5, messages.size());
for (int i = 0; i < messages.size(); i++) {
assertEquals("Message " + (i + 1), messages.get(i).getContent());
}
}
@Test
void testUserListUpdates() throws InterruptedException {
// User1 joins
ChatMessage joinMessage1 = new ChatMessage();
joinMessage1.setType(ChatMessage.MessageType.JOIN);
joinMessage1.setSender("user1");
sessionHandler1.send("/app/chat.addUser", joinMessage1);
// Should receive user list update
UserListMessage userList1 = sessionHandler1.receiveMessage(
UserListMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(userList1);
assertTrue(userList1.getUsers().contains("user1"));
assertEquals(1, userList1.getActiveUsers());
// User2 joins
ChatMessage joinMessage2 = new ChatMessage();
joinMessage2.setType(ChatMessage.MessageType.JOIN);
joinMessage2.setSender("user2");
sessionHandler2.send("/app/chat.addUser", joinMessage2);
// Both should receive updated user list
UserListMessage userList2a = sessionHandler1.receiveMessage(
UserListMessage.class, 2, TimeUnit.SECONDS);
UserListMessage userList2b = sessionHandler2.receiveMessage(
UserListMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(userList2a);
assertNotNull(userList2b);
assertTrue(userList2a.getUsers().contains("user1"));
assertTrue(userList2a.getUsers().contains("user2"));
assertEquals(2, userList2a.getActiveUsers());
}
}
// PrivateMessageWebSocketTest.java
package com.company.websocket.test;
import com.company.websocket.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
class PrivateMessageWebSocketTest extends BaseWebSocketTest {
private TestStompSessionHandler user1Handler;
private TestStompSessionHandler user2Handler;
private StompSession user1Session;
private StompSession user2Session;
@BeforeEach
void setUp() throws Exception {
user1Handler = new TestStompSessionHandler();
user2Handler = new TestStompSessionHandler();
user1Session = connect(user1Handler, 5);
user2Session = connect(user2Handler, 5);
// Subscribe to private queues
user1Handler.subscribe("/user/queue/private");
user2Handler.subscribe("/user/queue/private");
}
@AfterEach
void tearDown() {
if (user1Handler != null) user1Handler.disconnect();
if (user2Handler != null) user2Handler.disconnect();
}
@Test
void testPrivateMessageDelivery() throws InterruptedException {
// User1 sends private message to User2
PrivateMessage privateMessage = new PrivateMessage();
privateMessage.setFromUser("user1");
privateMessage.setToUser("user2");
privateMessage.setContent("Secret message");
user1Handler.send("/app/chat.private", privateMessage);
// Only User2 should receive the message
PrivateMessage received = user2Handler.receiveMessage(
PrivateMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals("user1", received.getFromUser());
assertEquals("user2", received.getToUser());
assertEquals("Secret message", received.getContent());
// User1 should not receive the message (no echo)
PrivateMessage notReceived = user1Handler.receiveMessage(
PrivateMessage.class, 1, TimeUnit.SECONDS);
assertNull(notReceived);
}
@Test
void testBidirectionalPrivateMessaging() throws InterruptedException {
// User1 sends to User2
PrivateMessage message1 = new PrivateMessage();
message1.setFromUser("user1");
message1.setToUser("user2");
message1.setContent("Hello User2");
user1Handler.send("/app/chat.private", message1);
PrivateMessage received1 = user2Handler.receiveMessage(
PrivateMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(received1);
assertEquals("Hello User2", received1.getContent());
// User2 replies to User1
PrivateMessage message2 = new PrivateMessage();
message2.setFromUser("user2");
message2.setToUser("user1");
message2.setContent("Hello User1");
user2Handler.send("/app/chat.private", message2);
PrivateMessage received2 = user1Handler.receiveMessage(
PrivateMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(received2);
assertEquals("Hello User1", received2.getContent());
}
}
// SystemWebSocketTest.java
package com.company.websocket.test;
import com.company.websocket.model.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.context.TestPropertySource;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
@TestPropertySource(properties = {
"spring.task.scheduling.pool.size=1",
"spring.task.scheduling.enabled=true"
})
class SystemWebSocketTest extends BaseWebSocketTest {
private TestStompSessionHandler sessionHandler;
private StompSession session;
@BeforeEach
void setUp() throws Exception {
sessionHandler = new TestStompSessionHandler();
session = connect(sessionHandler, 5);
// Subscribe to system topics
sessionHandler.subscribe("/topic/notifications");
sessionHandler.subscribe("/queue/pong");
}
@AfterEach
void tearDown() {
if (sessionHandler != null) {
sessionHandler.disconnect();
}
}
@Test
void testSystemNotifications() throws InterruptedException {
// Wait for scheduled notification
SystemNotification notification = sessionHandler.receiveMessage(
SystemNotification.class, 35, TimeUnit.SECONDS);
assertNotNull(notification);
assertNotNull(notification.getTitle());
assertNotNull(notification.getMessage());
assertNotNull(notification.getTimestamp());
assertTrue(notification.getId() > 0);
}
@Test
void testPingPong() throws InterruptedException {
PingMessage ping = new PingMessage("test-123", System.currentTimeMillis());
sessionHandler.send("/app/system.ping", ping);
PongMessage pong = sessionHandler.receiveMessage(
PongMessage.class, 2, TimeUnit.SECONDS);
assertNotNull(pong);
assertEquals("test-123", pong.getId());
assertEquals("pong", pong.getResponse());
assertNotNull(pong.getTimestamp());
}
@Test
void testMultipleNotifications() throws InterruptedException {
// Wait for multiple notifications
await().atMost(65, TimeUnit.SECONDS).until(() -> 
sessionHandler.getMessageCount() >= 2);
List<SystemNotification> notifications = sessionHandler.receiveAllMessages(
SystemNotification.class, 1, TimeUnit.SECONDS);
assertTrue(notifications.size() >= 2);
// Verify notifications are in order (by ID)
for (int i = 1; i < notifications.size(); i++) {
assertTrue(notifications.get(i).getId() > notifications.get(i-1).getId());
}
}
}
4. Advanced WebSocket Testing
// WebSocketLoadTest.java
package com.company.websocket.test;
import com.company.websocket.model.ChatMessage;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.simp.stomp.StompSession;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
class WebSocketLoadTest extends BaseWebSocketTest {
@Test
void testMultipleConnections() throws Exception {
int connectionCount = 10;
ExecutorService executor = Executors.newFixedThreadPool(connectionCount);
List<CompletableFuture<StompSession>> futures = new ArrayList<>();
AtomicInteger successfulConnections = new AtomicInteger(0);
// Create multiple connections concurrently
for (int i = 0; i < connectionCount; i++) {
final int userId = i;
CompletableFuture<StompSession> future = CompletableFuture.supplyAsync(() -> {
try {
TestStompSessionHandler handler = new TestStompSessionHandler();
StompSession session = connect(handler, 10);
// Subscribe and send join message
handler.subscribe("/topic/public");
ChatMessage joinMessage = new ChatMessage();
joinMessage.setType(ChatMessage.MessageType.JOIN);
joinMessage.setSender("user" + userId);
handler.send("/app/chat.addUser", joinMessage);
successfulConnections.incrementAndGet();
return session;
} catch (Exception e) {
throw new RuntimeException("Failed to connect user " + userId, e);
}
}, executor);
futures.add(future);
}
// Wait for all connections to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
allFutures.get(30, TimeUnit.SECONDS);
assertEquals(connectionCount, successfulConnections.get());
executor.shutdown();
}
@Test
void testHighFrequencyMessages() throws Exception {
TestStompSessionHandler handler = new TestStompSessionHandler();
StompSession session = connect(handler, 5);
handler.subscribe("/topic/public");
int messageCount = 100;
long startTime = System.currentTimeMillis();
// Send messages rapidly
for (int i = 0; i < messageCount; i++) {
ChatMessage message = new ChatMessage();
message.setType(ChatMessage.MessageType.CHAT);
message.setSender("loadTestUser");
message.setContent("Message " + i);
handler.send("/app/chat.send", message);
}
long sendTime = System.currentTimeMillis() - startTime;
System.out.println("Sent " + messageCount + " messages in " + sendTime + "ms");
// Wait for all messages to be received
await().atMost(10, TimeUnit.SECONDS).until(() -> 
handler.getMessageCount() >= messageCount);
List<ChatMessage> receivedMessages = handler.receiveAllMessages(
ChatMessage.class, 2, TimeUnit.SECONDS);
assertEquals(messageCount, receivedMessages.size());
// Verify no messages were lost and order is maintained
for (int i = 0; i < receivedMessages.size(); i++) {
assertEquals("Message " + i, receivedMessages.get(i).getContent());
}
handler.disconnect();
}
}
// WebSocketErrorHandlingTest.java
package com.company.websocket.test;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.simp.stomp.StompSession;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;
class WebSocketErrorHandlingTest extends BaseWebSocketTest {
@Test
void testInvalidMessageFormat() throws Exception {
TestStompSessionHandler handler = new TestStompSessionHandler();
StompSession session = connect(handler, 5);
// Send invalid JSON
handler.send("/app/chat.send", "invalid json string");
// Session should remain connected
assertTrue(session.isConnected());
// Wait a bit to see if any error messages come through
Thread.sleep(1000);
handler.disconnect();
}
@Test
void testConnectionRecovery() throws Exception {
AsyncTestStompSessionHandler handler = new AsyncTestStompSessionHandler();
CompletableFuture<StompSession> future = connectAsync(handler);
StompSession session = future.get(5, TimeUnit.SECONDS);
assertTrue(session.isConnected());
// Disconnect and try to reconnect
session.disconnect();
// Wait for disconnect
Thread.sleep(1000);
assertFalse(session.isConnected());
// Try to reconnect
AsyncTestStompSessionHandler newHandler = new AsyncTestStompSessionHandler();
CompletableFuture<StompSession> newFuture = connectAsync(newHandler);
StompSession newSession = newFuture.get(5, TimeUnit.SECONDS);
assertTrue(newSession.isConnected());
newHandler.disconnect();
}
@Test
void testLargeMessageHandling() throws Exception {
TestStompSessionHandler handler = new TestStompSessionHandler();
StompSession session = connect(handler, 5);
handler.subscribe("/topic/public");
// Create a large message
StringBuilder largeContent = new StringBuilder();
for (int i = 0; i < 10000; i++) {
largeContent.append("This is a large message. ");
}
ChatMessage largeMessage = new ChatMessage();
largeMessage.setType(ChatMessage.MessageType.CHAT);
largeMessage.setSender("testUser");
largeMessage.setContent(largeContent.toString());
handler.send("/app/chat.send", largeMessage);
// Should receive the message without issues
ChatMessage received = handler.receiveMessage(
ChatMessage.class, 5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(largeContent.toString(), received.getContent());
handler.disconnect();
}
}
5. WebSocket Test Utilities
// WebSocketTestUtils.java
package com.company.websocket.test;
import com.company.websocket.model.ChatMessage;
import org.springframework.messaging.simp.stomp.StompSession;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class WebSocketTestUtils {
public static CountDownLatch sendMessageAndWaitForResponse(
StompSession session, String sendDestination, Object sendPayload,
String subscribeDestination, Class<?> expectedResponseType, 
long timeout, TimeUnit unit) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Object> responseHolder = new AtomicReference<>();
StompSession.Subscription subscription = session.subscribe(
subscribeDestination, new TestStompSessionHandler() {
@Override
public void handleFrame(org.springframework.messaging.simp.stomp.StompHeaders headers, Object payload) {
if (expectedResponseType.isInstance(payload)) {
responseHolder.set(payload);
latch.countDown();
}
}
});
session.send(sendDestination, sendPayload);
boolean received = latch.await(timeout, unit);
subscription.unsubscribe();
if (!received) {
throw new RuntimeException("Timeout waiting for response");
}
return latch;
}
public static void waitForConnectionCount(int expectedCount, long timeout, TimeUnit unit) {
// Implementation would depend on how you track connections
// This is a placeholder for actual implementation
try {
Thread.sleep(unit.toMillis(timeout));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static ChatMessage createChatMessage(String sender, String content) {
ChatMessage message = new ChatMessage();
message.setType(ChatMessage.MessageType.CHAT);
message.setSender(sender);
message.setContent(content);
return message;
}
public static ChatMessage createJoinMessage(String sender) {
ChatMessage message = new ChatMessage();
message.setType(ChatMessage.MessageType.JOIN);
message.setSender(sender);
return message;
}
}
// WebSocketHealthCheck.java
package com.company.websocket.test;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WebSocketHealthCheck implements HealthIndicator {
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final AtomicInteger messageCount = new AtomicInteger(0);
public void incrementConnection() {
connectionCount.incrementAndGet();
}
public void decrementConnection() {
connectionCount.decrementAndGet();
}
public void incrementMessageCount() {
messageCount.incrementAndGet();
}
@Override
public Health health() {
int connections = connectionCount.get();
int messages = messageCount.get();
Health.Builder status = connections > 0 ? Health.up() : Health.down();
return status
.withDetail("activeConnections", connections)
.withDetail("totalMessages", messages)
.build();
}
}

Application Configuration

1. Application Properties
# application.yml
spring:
application:
name: websocket-testing-app
server:
port: 8080
logging:
level:
com.company.websocket: DEBUG
org.springframework.web.socket: INFO
org.springframework.messaging: INFO
management:
endpoints:
web:
exposure:
include: health,metrics,info
endpoint:
health:
show-details: always
# Test Configuration
test:
websocket:
timeout: 5
max-connections: 100

This comprehensive WebSocket testing implementation provides:

  • Complete WebSocket server with STOMP protocol support
  • Multiple message types (chat, private, system notifications, ping/pong)
  • Comprehensive test framework with base test classes
  • Advanced testing scenarios including load testing and error handling
  • Real-time message validation with proper timeout handling
  • Concurrent connection testing for scalability
  • Health monitoring and metrics collection
  • Production-ready configuration with proper error handling

The testing framework ensures reliable WebSocket communication and can handle various real-world scenarios including high loads, connection failures, and different message types.

Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/

OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/

OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/

Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.

https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics

Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2

Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide

Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2

Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide

Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.

https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server

Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper