Shadow Traffic Testing Implementation in Java

Introduction

Shadow traffic testing, also known as dark traffic testing or traffic mirroring, is a powerful technique for testing new systems by replicating production traffic to a shadow environment without affecting users. This approach allows comprehensive testing with real-world data while maintaining system safety and reliability.

Core Architecture

1. Traffic Mirroring Configuration

@Configuration
@ConfigurationProperties(prefix = "shadow.traffic")
@Data
public class ShadowTrafficConfig {
private boolean enabled = false;
private double samplingRate = 0.01; // 1% of traffic
private List<String> includedEndpoints = List.of("/api/v1/**");
private List<String> excludedEndpoints = List.of("/api/v1/health");
private String shadowBaseUrl = "http://shadow-environment:8080";
private int shadowTimeoutMs = 5000;
private boolean asyncMode = true;
private Map<String, String> shadowHeaders = new HashMap<>();
}

2. Shadow Traffic Manager

@Service
@Slf4j
public class ShadowTrafficManager {
private final ShadowTrafficConfig config;
private final TrafficSampler trafficSampler;
private final ShadowClient shadowClient;
private final TrafficRecorder trafficRecorder;
private final ThreadPoolTaskExecutor shadowExecutor;
@PostConstruct
public void init() {
log.info("Shadow traffic manager initialized. Enabled: {}, Sampling rate: {}%", 
config.isEnabled(), config.getSamplingRate() * 100);
}
public boolean shouldShadow(String endpoint, String requestId) {
if (!config.isEnabled()) {
return false;
}
if (!isEndpointIncluded(endpoint)) {
return false;
}
return trafficSampler.shouldSample(requestId, config.getSamplingRate());
}
private boolean isEndpointIncluded(String endpoint) {
return config.getIncludedEndpoints().stream()
.anyMatch(pattern -> new AntPathMatcher().match(pattern, endpoint))
&& config.getExcludedEndpoints().stream()
.noneMatch(pattern -> new AntPathMatcher().match(pattern, endpoint));
}
}

Implementation Components

1. Traffic Sampling

@Component
public class TrafficSampler {
private final Random random = new Random();
public boolean shouldSample(String requestId, double samplingRate) {
if (samplingRate >= 1.0) return true;
if (samplingRate <= 0.0) return false;
// Use consistent hashing for deterministic sampling per request
int hash = Math.abs(requestId.hashCode());
double sampleValue = (hash % 10000) / 10000.0;
return sampleValue < samplingRate;
}
public boolean shouldSample(String requestId, double samplingRate, 
Map<String, String> attributes) {
// Weighted sampling based on attributes
double adjustedRate = adjustSamplingRate(samplingRate, attributes);
return shouldSample(requestId, adjustedRate);
}
private double adjustSamplingRate(double baseRate, Map<String, String> attributes) {
double multiplier = 1.0;
// Increase sampling for specific user segments
if ("premium".equals(attributes.get("userTier"))) {
multiplier = 2.0;
}
// Adjust based on endpoint criticality
if ("/api/v1/payments".equals(attributes.get("endpoint"))) {
multiplier = 1.5;
}
return Math.min(baseRate * multiplier, 1.0);
}
}

2. Shadow Client

@Component
@Slf4j
public class ShadowClient {
private final RestTemplate shadowRestTemplate;
private final ShadowTrafficConfig config;
private final MeterRegistry meterRegistry;
@Async("shadowExecutor")
public CompletableFuture<ShadowResponse> sendShadowRequest(
ShadowRequest shadowRequest) {
Timer.Sample timer = Timer.start(meterRegistry);
String endpoint = shadowRequest.getEndpoint();
try {
HttpHeaders headers = createShadowHeaders(shadowRequest);
HttpEntity<Object> entity = new HttpEntity<>(
shadowRequest.getBody(), headers);
ResponseEntity<String> response = shadowRestTemplate.exchange(
config.getShadowBaseUrl() + endpoint,
shadowRequest.getHttpMethod(),
entity,
String.class
);
ShadowResponse shadowResponse = ShadowResponse.builder()
.originalRequestId(shadowRequest.getRequestId())
.shadowStatusCode(response.getStatusCodeValue())
.shadowResponseBody(response.getBody())
.duration(System.currentTimeMillis() - shadowRequest.getTimestamp())
.success(true)
.build();
timer.stop(Timer.builder("shadow.request.duration")
.tag("endpoint", endpoint)
.tag("status", "success")
.register(meterRegistry));
meterRegistry.counter("shadow.request.success", 
"endpoint", endpoint).increment();
return CompletableFuture.completedFuture(shadowResponse);
} catch (Exception e) {
log.warn("Shadow request failed for endpoint: {}", endpoint, e);
timer.stop(Timer.builder("shadow.request.duration")
.tag("endpoint", endpoint)
.tag("status", "error")
.register(meterRegistry));
meterRegistry.counter("shadow.request.error", 
"endpoint", endpoint, "error", e.getClass().getSimpleName()).increment();
return CompletableFuture.completedFuture(
ShadowResponse.error(shadowRequest.getRequestId(), e.getMessage()));
}
}
private HttpHeaders createShadowHeaders(ShadowRequest request) {
HttpHeaders headers = new HttpHeaders();
// Copy original headers (excluding sensitive ones)
request.getHeaders().forEach((key, value) -> {
if (!isSensitiveHeader(key)) {
headers.add(key, value);
}
});
// Add shadow-specific headers
headers.add("X-Shadow-Traffic", "true");
headers.add("X-Original-Request-Id", request.getRequestId());
headers.add("X-Shadow-Timestamp", String.valueOf(request.getTimestamp()));
// Add configured shadow headers
config.getShadowHeaders().forEach(headers::add);
return headers;
}
private boolean isSensitiveHeader(String headerName) {
String lowerHeader = headerName.toLowerCase();
return lowerHeader.contains("auth") || 
lowerHeader.contains("password") || 
lowerHeader.contains("token") || 
lowerHeader.contains("secret");
}
}

3. Request/Response Models

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ShadowRequest {
private String requestId;
private String endpoint;
private HttpMethod httpMethod;
private Map<String, String> headers;
private Object body;
private long timestamp;
private Map<String, String> attributes;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ShadowResponse {
private String originalRequestId;
private Integer shadowStatusCode;
private String shadowResponseBody;
private String errorMessage;
private long duration;
private boolean success;
public static ShadowResponse error(String requestId, String errorMessage) {
return ShadowResponse.builder()
.originalRequestId(requestId)
.errorMessage(errorMessage)
.success(false)
.build();
}
}

Spring Boot Integration

1. Web Filter for Traffic Interception

@Component
@Order(1)
@Slf4j
public class ShadowTrafficFilter implements Filter {
private final ShadowTrafficManager shadowManager;
private final ShadowClient shadowClient;
private final ObjectMapper objectMapper;
@Override
public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Create wrapper to capture request body
ContentCachingRequestWrapper wrappedRequest = 
new ContentCachingRequestWrapper(httpRequest);
ContentCachingResponseWrapper wrappedResponse = 
new ContentCachingResponseWrapper(httpResponse);
String requestId = getOrCreateRequestId(wrappedRequest);
String endpoint = wrappedRequest.getRequestURI();
try {
chain.doFilter(wrappedRequest, wrappedResponse);
// After processing original request, check if we should shadow
if (shadowManager.shouldShadow(endpoint, requestId)) {
shadowTraffic(wrappedRequest, wrappedResponse, requestId);
}
} finally {
wrappedResponse.copyBodyToResponse();
}
}
private void shadowTraffic(ContentCachingRequestWrapper request, 
ContentCachingResponseWrapper response,
String requestId) {
try {
ShadowRequest shadowRequest = createShadowRequest(request, requestId);
shadowClient.sendShadowRequest(shadowRequest)
.whenComplete((shadowResponse, throwable) -> {
if (throwable != null) {
log.debug("Shadow request completed with error for request: {}", 
requestId, throwable);
} else {
log.debug("Shadow request completed successfully for request: {}", 
requestId);
}
});
} catch (Exception e) {
log.warn("Failed to create shadow request for: {}", requestId, e);
}
}
private ShadowRequest createShadowRequest(HttpServletRequest request, 
String requestId) throws IOException {
Map<String, String> headers = Collections.list(request.getHeaderNames())
.stream()
.collect(Collectors.toMap(
name -> name,
name -> request.getHeader(name)
));
byte[] requestBody = ((ContentCachingRequestWrapper) request).getContentAsByteArray();
Object body = requestBody.length > 0 ? 
objectMapper.readValue(requestBody, Object.class) : null;
return ShadowRequest.builder()
.requestId(requestId)
.endpoint(request.getRequestURI())
.httpMethod(HttpMethod.valueOf(request.getMethod()))
.headers(headers)
.body(body)
.timestamp(System.currentTimeMillis())
.attributes(extractAttributes(request))
.build();
}
private String getOrCreateRequestId(HttpServletRequest request) {
String requestId = request.getHeader("X-Request-ID");
return requestId != null ? requestId : UUID.randomUUID().toString();
}
private Map<String, String> extractAttributes(HttpServletRequest request) {
Map<String, String> attributes = new HashMap<>();
attributes.put("userAgent", request.getHeader("User-Agent"));
attributes.put("clientIp", getClientIp(request));
attributes.put("method", request.getMethod());
return attributes;
}
private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0];
}
return request.getRemoteAddr();
}
}

2. Async Configuration

@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("shadowExecutor")
public ThreadPoolTaskExecutor shadowTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("shadow-traffic-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
@Bean
public RestTemplate shadowRestTemplate(ShadowTrafficConfig config) {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
// Set timeout for shadow requests
HttpComponentsClientHttpRequestFactory requestFactory = 
(HttpComponentsClientHttpRequestFactory) restTemplate.getRequestFactory();
requestFactory.setConnectTimeout(config.getShadowTimeoutMs());
requestFactory.setReadTimeout(config.getShadowTimeoutMs());
return restTemplate;
}
}

Comparison and Validation

1. Response Comparison Service

@Service
@Slf4j
public class ResponseComparisonService {
private final ObjectMapper objectMapper;
private final MeterRegistry meterRegistry;
public ComparisonResult compareResponses(Object primaryResponse, 
Object shadowResponse,
String endpoint) {
Timer.Sample timer = Timer.start(meterRegistry);
try {
String primaryJson = objectMapper.writeValueAsString(primaryResponse);
String shadowJson = objectMapper.writeValueAsString(shadowResponse);
boolean exactMatch = primaryJson.equals(shadowJson);
double similarityScore = calculateSimilarity(primaryJson, shadowJson);
ComparisonResult result = ComparisonResult.builder()
.exactMatch(exactMatch)
.similarityScore(similarityScore)
.primaryResponse(primaryResponse)
.shadowResponse(shadowResponse)
.endpoint(endpoint)
.timestamp(Instant.now())
.build();
// Record metrics
meterRegistry.counter("shadow.comparison.total", 
"endpoint", endpoint).increment();
if (!exactMatch) {
meterRegistry.counter("shadow.comparison.mismatch", 
"endpoint", endpoint).increment();
meterRegistry.gauge("shadow.comparison.similarity", 
List.of(Tag.of("endpoint", endpoint)), similarityScore);
}
return result;
} catch (Exception e) {
log.error("Failed to compare responses for endpoint: {}", endpoint, e);
meterRegistry.counter("shadow.comparison.error", 
"endpoint", endpoint).increment();
return ComparisonResult.error(endpoint, e.getMessage());
} finally {
timer.stop(Timer.builder("shadow.comparison.duration")
.tag("endpoint", endpoint)
.register(meterRegistry));
}
}
private double calculateSimilarity(String json1, String json2) {
try {
JsonNode node1 = objectMapper.readTree(json1);
JsonNode node2 = objectMapper.readTree(json2);
return calculateNodeSimilarity(node1, node2);
} catch (Exception e) {
return 0.0;
}
}
private double calculateNodeSimilarity(JsonNode node1, JsonNode node2) {
if (node1.equals(node2)) return 1.0;
if (!node1.getNodeType().equals(node2.getNodeType())) return 0.0;
// Implement more sophisticated similarity calculation
// based on your specific requirements
return 0.5; // Placeholder
}
}

2. Data Consistency Validator

@Service
public class DataConsistencyValidator {
private final ShadowTrafficRecorder trafficRecorder;
public void validateDataConsistency(String requestId, 
String primarySystem, 
String shadowSystem) {
List<ShadowRequest> primaryRequests = 
trafficRecorder.getRequestsBySystem(primarySystem, requestId);
List<ShadowRequest> shadowRequests = 
trafficRecorder.getRequestsBySystem(shadowSystem, requestId);
// Compare request sequences and data
validateRequestSequence(primaryRequests, shadowRequests);
validateDataEquivalence(primaryRequests, shadowRequests);
}
private void validateRequestSequence(List<ShadowRequest> primary, 
List<ShadowRequest> shadow) {
if (primary.size() != shadow.size()) {
log.warn("Request sequence length mismatch: primary={}, shadow={}", 
primary.size(), shadow.size());
}
}
private void validateDataEquivalence(List<ShadowRequest> primary, 
List<ShadowRequest> shadow) {
for (int i = 0; i < Math.min(primary.size(), shadow.size()); i++) {
ShadowRequest primaryReq = primary.get(i);
ShadowRequest shadowReq = shadow.get(i);
if (!Objects.equals(primaryReq.getBody(), shadowReq.getBody())) {
log.warn("Data equivalence mismatch at position {}: primary={}, shadow={}", 
i, primaryReq.getBody(), shadowReq.getBody());
}
}
}
}

Monitoring and Analytics

1. Shadow Traffic Metrics

@Component
public class ShadowTrafficMetrics {
private final MeterRegistry meterRegistry;
private final ShadowTrafficConfig config;
@EventListener
public void handleShadowRequest(ShadowRequestEvent event) {
// Track shadow request initiation
meterRegistry.counter("shadow.request.initiated",
"endpoint", event.getEndpoint(),
"sampling_rate", String.valueOf(config.getSamplingRate()))
.increment();
}
@EventListener
public void handleShadowResponse(ShadowResponseEvent event) {
ShadowResponse response = event.getResponse();
// Track response metrics
meterRegistry.counter("shadow.request.completed",
"endpoint", event.getEndpoint(),
"success", String.valueOf(response.isSuccess()))
.increment();
if (response.isSuccess()) {
meterRegistry.timer("shadow.request.duration",
"endpoint", event.getEndpoint())
.record(response.getDuration(), TimeUnit.MILLISECONDS);
}
}
@Scheduled(fixedRate = 60000) // Every minute
public void reportShadowTrafficSummary() {
double samplingRate = config.getSamplingRate();
long totalTraffic = meterRegistry.counter("http.requests.total").count();
long shadowTraffic = meterRegistry.counter("shadow.request.initiated").count();
double actualSamplingRate = totalTraffic > 0 ? 
(double) shadowTraffic / totalTraffic : 0.0;
log.info("Shadow traffic summary: configured rate={}%, actual rate={}%, total={}, shadow={}",
samplingRate * 100, actualSamplingRate * 100, totalTraffic, shadowTraffic);
}
}

2. Traffic Recorder for Analysis

@Service
public class ShadowTrafficRecorder {
private final Map<String, ShadowRequest> requests = new ConcurrentHashMap<>();
private final Map<String, ShadowResponse> responses = new ConcurrentHashMap<>();
@Value("${shadow.traffic.retention.hours:24}")
private int retentionHours;
public void recordRequest(ShadowRequest request) {
requests.put(request.getRequestId(), request);
}
public void recordResponse(ShadowResponse response) {
responses.put(response.getOriginalRequestId(), response);
}
public Optional<ShadowRequest> getRequest(String requestId) {
return Optional.ofNullable(requests.get(requestId));
}
public Optional<ShadowResponse> getResponse(String requestId) {
return Optional.ofNullable(responses.get(requestId));
}
public List<ShadowRequest> getRequestsByTimeRange(Instant start, Instant end) {
return requests.values().stream()
.filter(req -> req.getTimestamp() >= start.toEpochMilli() && 
req.getTimestamp() <= end.toEpochMilli())
.collect(Collectors.toList());
}
@Scheduled(fixedRate = 3600000) // Cleanup every hour
public void cleanupOldRecords() {
Instant cutoff = Instant.now().minus(retentionHours, ChronoUnit.HOURS);
long cutoffMillis = cutoff.toEpochMilli();
requests.entrySet().removeIf(entry -> 
entry.getValue().getTimestamp() < cutoffMillis);
responses.entrySet().removeIf(entry -> 
getRequest(entry.getKey())
.map(req -> req.getTimestamp() < cutoffMillis)
.orElse(true));
}
}

Testing Strategies

1. Unit Tests

@ExtendWith(MockitoExtension.class)
class ShadowTrafficManagerTest {
@Mock
private TrafficSampler trafficSampler;
@Mock
private ShadowTrafficConfig config;
@InjectMocks
private ShadowTrafficManager shadowManager;
@Test
void shouldShadowWhenEnabledAndIncludedEndpoint() {
// Given
when(config.isEnabled()).thenReturn(true);
when(config.getIncludedEndpoints()).thenReturn(List.of("/api/v1/**"));
when(config.getExcludedEndpoints()).thenReturn(List.of());
when(trafficSampler.shouldSample(anyString(), anyDouble())).thenReturn(true);
// When
boolean result = shadowManager.shouldShadow("/api/v1/users", "req123");
// Then
assertTrue(result);
}
@Test
void shouldNotShadowWhenExcludedEndpoint() {
// Given
when(config.isEnabled()).thenReturn(true);
when(config.getIncludedEndpoints()).thenReturn(List.of("/api/v1/**"));
when(config.getExcludedEndpoints()).thenReturn(List.of("/api/v1/health"));
// When
boolean result = shadowManager.shouldShadow("/api/v1/health", "req123");
// Then
assertFalse(result);
}
}

2. Integration Tests

@SpringBootTest
@AutoConfigureTestDatabase
@TestPropertySource(properties = {
"shadow.traffic.enabled=true",
"shadow.traffic.sampling-rate=1.0",
"shadow.traffic.shadow-base-url=http://localhost:${wiremock.server.port}",
"shadow.traffic.async-mode=false"
})
class ShadowTrafficIntegrationTest {
@Autowired
private ShadowTrafficManager shadowManager;
@Autowired
private TestRestTemplate restTemplate;
@RegisterExtension
static WireMockExtension wireMock = WireMockExtension.newInstance()
.options(wireMockConfig().dynamicPort())
.build();
@Test
void shouldMirrorTrafficToShadowEnvironment() {
// Setup wiremock to expect shadow request
wireMock.stubFor(post(urlEqualTo("/api/v1/users"))
.willReturn(aResponse().withStatus(200)));
// When
ResponseEntity<String> response = restTemplate.postForEntity(
"/api/v1/users", "test data", String.class);
// Then - verify wiremock received the shadow request
wireMock.verify(1, postRequestedFor(urlEqualTo("/api/v1/users")));
}
}

Best Practices

1. Safety Guards

@Component
public class ShadowTrafficGuard {
private final ShadowTrafficConfig config;
private final HealthEndpoint healthEndpoint;
public boolean isShadowEnvironmentHealthy() {
try {
Health health = healthEndpoint.health();
return health.getStatus() == Status.UP;
} catch (Exception e) {
log.warn("Failed to check shadow environment health", e);
return false;
}
}
public boolean canSendShadowTraffic() {
return config.isEnabled() && isShadowEnvironmentHealthy();
}
@EventListener
public void onShadowEnvironmentUnhealthy(HealthEndpointFailedEvent event) {
log.warn("Shadow environment unhealthy, disabling shadow traffic");
// Implement circuit breaker pattern
}
}

2. Performance Optimization

@Component
public class ShadowTrafficOptimizer {
private final ShadowTrafficRecorder trafficRecorder;
private final ShadowTrafficConfig config;
@Scheduled(fixedRate = 300000) // Every 5 minutes
public void optimizeSamplingRates() {
// Analyze recent traffic patterns
Instant oneHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
List<ShadowRequest> recentRequests = 
trafficRecorder.getRequestsByTimeRange(oneHourAgo, Instant.now());
Map<String, Long> endpointCounts = recentRequests.stream()
.collect(Collectors.groupingBy(ShadowRequest::getEndpoint, 
Collectors.counting()));
// Adjust sampling rates based on traffic volume and importance
endpointCounts.forEach((endpoint, count) -> {
double optimalRate = calculateOptimalSamplingRate(endpoint, count);
// Update configuration dynamically
updateEndpointSamplingRate(endpoint, optimalRate);
});
}
private double calculateOptimalSamplingRate(String endpoint, long requestCount) {
// Implement logic to calculate optimal sampling rate
// Consider factors like:
// - Traffic volume
// - Endpoint criticality
// - System capacity
// - Testing requirements
return Math.min(0.1, 1000.0 / requestCount); // Example logic
}
}

Conclusion

Shadow traffic testing in Java provides a robust mechanism for safely testing new systems with real production traffic. By implementing the patterns and components described above, teams can:

  1. Safely test new systems without impacting users
  2. Validate system behavior with real-world data patterns
  3. Compare performance and correctness between systems
  4. Gradually increase confidence before full rollout

Key success factors include proper sampling strategies, comprehensive monitoring, data consistency validation, and safety mechanisms to protect both primary and shadow systems from adverse effects.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper