Shadow Traffic Testing in Java: A Comprehensive Guide

Introduction

Shadow traffic testing, also known as dark traffic testing, is a powerful technique for validating new system versions by replicating production traffic to a shadow environment without affecting real users. This approach allows teams to test changes with real-world load patterns while maintaining system safety.

How Shadow Traffic Testing Works

In shadow testing:

  • Live production traffic is duplicated and sent to the new version
  • Real users continue using the stable version unaffected
  • Shadow system processes requests but responses are discarded
  • Comparison tools analyze differences between systems

Core Implementation Components

1. Traffic Duplication Framework

public interface TrafficShadowingService {
void startShadowing();
void stopShadowing();
ShadowMetrics getMetrics();
boolean isShadowingActive();
}
public class KafkaTrafficShadowingService implements TrafficShadowingService {
private final KafkaConsumer<String, String> liveConsumer;
private final KafkaProducer<String, String> shadowProducer;
private final ShadowConfig config;
private volatile boolean isActive = false;
private final ShadowMetrics metrics;
public KafkaTrafficShadowingService(ShadowConfig config) {
this.config = config;
this.liveConsumer = createConsumer();
this.shadowProducer = createProducer();
this.metrics = new ShadowMetrics();
}
@Override
public void startShadowing() {
isActive = true;
new Thread(this::shadowLoop).start();
}
private void shadowLoop() {
while (isActive) {
ConsumerRecords<String, String> records = liveConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldShadow(record)) {
shadowMessage(record);
}
}
}
}
private boolean shouldShadow(ConsumerRecord<String, String> record) {
// Implement sampling logic (percentage-based, header-based, etc.)
return config.getSamplingRate() > Math.random();
}
private void shadowMessage(ConsumerRecord<String, String> record) {
ProducerRecord<String, String> shadowRecord = new ProducerRecord<>(
config.getShadowTopic(),
record.key(),
record.value()
);
// Add shadow headers for tracking
shadowRecord.headers()
.add("X-Shadow-Timestamp", String.valueOf(System.currentTimeMillis()).getBytes())
.add("X-Shadow-Id", UUID.randomUUID().toString().getBytes())
.add("X-Shadow-Source", "live-traffic".getBytes());
shadowProducer.send(shadowRecord, (metadata, exception) -> {
if (exception != null) {
metrics.recordFailure();
} else {
metrics.recordSuccess();
}
});
}
@Override
public void stopShadowing() {
isActive = false;
}
@Override
public ShadowMetrics getMetrics() {
return metrics.copy();
}
@Override
public boolean isShadowingActive() {
return isActive;
}
}

2. Shadow Configuration Management

@Data
@Configuration
@ConfigurationProperties(prefix = "shadow.traffic")
public class ShadowConfig {
private boolean enabled = false;
private double samplingRate = 0.1; // 10% by default
private String shadowTopic;
private String liveTopic;
private Set<String> excludedOperations = new HashSet<>();
private Map<String, String> shadowHeaders = new HashMap<>();
private int maxConcurrentRequests = 1000;
private Duration timeout = Duration.ofSeconds(30);
public boolean isOperationExcluded(String operation) {
return excludedOperations.contains(operation);
}
}
@Component
public class DynamicShadowConfig {
private final AtomicReference<ShadowConfig> currentConfig;
private final ConfigRepository configRepository;
public DynamicShadowConfig(ConfigRepository configRepository) {
this.configRepository = configRepository;
this.currentConfig = new AtomicReference<>(loadConfig());
}
public void updateConfig(ShadowConfig newConfig) {
currentConfig.set(newConfig);
configRepository.save(newConfig);
}
public ShadowConfig getCurrentConfig() {
return currentConfig.get();
}
public void setSamplingRate(double rate) {
ShadowConfig config = currentConfig.get();
ShadowConfig newConfig = config.toBuilder()
.samplingRate(Math.max(0.0, Math.min(1.0, rate)))
.build();
updateConfig(newConfig);
}
private ShadowConfig loadConfig() {
return configRepository.findLatest()
.orElseGet(ShadowConfig::new);
}
}

3. HTTP Traffic Shadowing with Spring Boot

@Component
public class HttpTrafficShadowingFilter implements Filter {
private final ShadowHttpClient shadowHttpClient;
private final ShadowConfig config;
private final ObjectMapper objectMapper;
@Override
public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Create wrapper to capture request body
CachedBodyHttpServletRequest cachedRequest = 
new CachedBodyHttpServletRequest(httpRequest);
// Continue with normal processing
chain.doFilter(cachedRequest, response);
// Shadow the request if enabled
if (config.isEnabled() && shouldShadowRequest(cachedRequest)) {
shadowRequestAsync(cachedRequest);
}
}
private boolean shouldShadowRequest(HttpServletRequest request) {
return !config.isOperationExcluded(request.getRequestURI()) &&
config.getSamplingRate() > Math.random();
}
private void shadowRequestAsync(HttpServletRequest originalRequest) {
CompletableFuture.runAsync(() -> {
try {
ShadowHttpRequest shadowRequest = createShadowRequest(originalRequest);
shadowHttpClient.execute(shadowRequest);
} catch (Exception e) {
// Log but don't affect original request
log.warn("Shadow request failed: {}", e.getMessage());
}
});
}
private ShadowHttpRequest createShadowRequest(HttpServletRequest original) {
return ShadowHttpRequest.builder()
.method(original.getMethod())
.url(buildShadowUrl(original))
.headers(extractHeaders(original))
.body(extractBody(original))
.shadowMetadata(createShadowMetadata(original))
.build();
}
}
@Component
public class ShadowHttpClient {
private final RestTemplate restTemplate;
private final ShadowConfig config;
private final ShadowMetrics metrics;
public void execute(ShadowHttpRequest request) {
HttpHeaders headers = new HttpHeaders();
request.getHeaders().forEach(headers::add);
// Add shadow identification headers
headers.add("X-Shadow-Request", "true");
headers.add("X-Shadow-Timestamp", String.valueOf(System.currentTimeMillis()));
HttpEntity<byte[]> entity = new HttpEntity<>(request.getBody(), headers);
try {
ResponseEntity<byte[]> response = restTemplate.exchange(
request.getUrl(),
HttpMethod.valueOf(request.getMethod()),
entity,
byte[].class
);
metrics.recordHttpSuccess(response.getStatusCodeValue());
} catch (RestClientException e) {
metrics.recordHttpFailure();
log.debug("Shadow request failed: {}", e.getMessage());
}
}
}

4. Response Comparison and Validation

@Component
public class ResponseComparator {
private final ObjectMapper objectMapper;
private final ComparisonConfig comparisonConfig;
public ComparisonResult compareResponses(byte[] liveResponse, byte[] shadowResponse,
String operation) {
ComparisonResult result = new ComparisonResult(operation);
try {
// Compare status codes if available
result.setStatusMatch(compareStatus(liveResponse, shadowResponse));
// Compare response bodies
result.setBodyMatch(compareBodies(liveResponse, shadowResponse));
// Compare response times if available
result.setPerformanceMatch(comparePerformance(liveResponse, shadowResponse));
// Semantic comparison for JSON responses
if (isJsonResponse(liveResponse) && isJsonResponse(shadowResponse)) {
result.setSemanticMatch(compareSemantically(liveResponse, shadowResponse));
}
} catch (Exception e) {
result.setError(e.getMessage());
}
return result;
}
private boolean compareBodies(byte[] live, byte[] shadow) {
if (live == null && shadow == null) return true;
if (live == null || shadow == null) return false;
if (comparisonConfig.isStrictByteComparison()) {
return Arrays.equals(live, shadow);
} else {
// Normalize and compare (ignore timestamps, IDs, etc.)
String liveNormalized = normalizeResponse(new String(live));
String shadowNormalized = normalizeResponse(new String(shadow));
return liveNormalized.equals(shadowNormalized);
}
}
private String normalizeResponse(String response) {
// Remove volatile fields that shouldn't be compared
return response.replaceAll("\"timestamp\":\\s*\\d+", "\"timestamp\":0")
.replaceAll("\"id\":\\s*\"[^\"]+\"", "\"id\":\"normalized\"")
.replaceAll("\"requestId\":\\s*\"[^\"]+\"", "\"requestId\":\"normalized\"");
}
}

5. Metrics and Monitoring

@Data
public class ShadowMetrics {
private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong successfulRequests = new AtomicLong();
private final AtomicLong failedRequests = new AtomicLong();
private final AtomicLong comparisons = new AtomicLong();
private final AtomicLong mismatches = new AtomicLong();
private final Map<String, AtomicLong> failuresByType = new ConcurrentHashMap<>();
public void recordSuccess() {
totalRequests.incrementAndGet();
successfulRequests.incrementAndGet();
}
public void recordFailure() {
totalRequests.incrementAndGet();
failedRequests.incrementAndGet();
}
public void recordFailure(String type) {
recordFailure();
failuresByType.computeIfAbsent(type, k -> new AtomicLong()).incrementAndGet();
}
public void recordComparison(boolean matched) {
comparisons.incrementAndGet();
if (!matched) {
mismatches.incrementAndGet();
}
}
public ShadowMetrics copy() {
ShadowMetrics copy = new ShadowMetrics();
copy.totalRequests.set(this.totalRequests.get());
copy.successfulRequests.set(this.successfulRequests.get());
copy.failedRequests.set(this.failedRequests.get());
copy.comparisons.set(this.comparisons.get());
copy.mismatches.set(this.mismatches.get());
this.failuresByType.forEach((key, value) -> 
copy.failuresByType.put(key, new AtomicLong(value.get())));
return copy;
}
}
@Component
public class ShadowMetricsExporter {
private final MeterRegistry meterRegistry;
private final ShadowMetrics metrics;
@Scheduled(fixedRate = 30000)
public void exportMetrics() {
// Export to Micrometer/Prometheus
Gauge.builder("shadow.requests.total")
.register(meterRegistry, metrics.getTotalRequests()::get);
Gauge.builder("shadow.requests.success")
.register(meterRegistry, metrics.getSuccessfulRequests()::get);
Gauge.builder("shadow.requests.failed")
.register(meterRegistry, metrics.getFailedRequests()::get);
Gauge.builder("shadow.comparisons.mismatch.ratio")
.register(meterRegistry, this::calculateMismatchRatio);
}
private double calculateMismatchRatio() {
long comparisons = metrics.getComparisons().get();
if (comparisons == 0) return 0.0;
return (double) metrics.getMismatches().get() / comparisons;
}
}

6. Complete Testing Workflow

@Service
public class ShadowTrafficOrchestrator {
private final TrafficShadowingService shadowingService;
private final DynamicShadowConfig config;
private final ResponseComparator comparator;
private final ShadowMetrics metrics;
public void runShadowTest(ShadowTestPlan testPlan) {
try {
// Phase 1: Ramp-up shadow traffic
rampUpTraffic(testPlan);
// Phase 2: Full shadowing with comparison
runComparisonPhase(testPlan);
// Phase 3: Analysis and reporting
generateTestReport(testPlan);
} catch (Exception e) {
log.error("Shadow test failed: {}", e.getMessage());
// Always ensure shadowing is stopped
shadowingService.stopShadowing();
}
}
private void rampUpTraffic(ShadowTestPlan testPlan) throws InterruptedException {
log.info("Starting traffic ramp-up");
double[] rampRates = {0.01, 0.05, 0.10, 0.25, 0.50, 1.0}; // 1% to 100%
for (double rate : rampRates) {
config.setSamplingRate(rate);
log.info("Ramping up to {}% shadow traffic", rate * 100);
Thread.sleep(testPlan.getRampUpInterval().toMillis());
// Check system health
if (!isShadowSystemHealthy()) {
throw new RuntimeException("Shadow system unhealthy at " + rate + " traffic");
}
}
}
private void runComparisonPhase(ShadowTestPlan testPlan) throws InterruptedException {
log.info("Starting comparison phase");
// Run comparison for specified duration
Instant endTime = Instant.now().plus(testPlan.getComparisonDuration());
while (Instant.now().isBefore(endTime)) {
ShadowMetrics currentMetrics = shadowingService.getMetrics();
logProgress(currentMetrics);
// Alert on high mismatch rates
if (calculateMismatchRate(currentMetrics) > testPlan.getMaxAllowedMismatchRate()) {
log.warn("High mismatch rate detected: {}%", 
calculateMismatchRate(currentMetrics) * 100);
}
Thread.sleep(60000); // Check every minute
}
}
private boolean isShadowSystemHealthy() {
// Implement health checks for shadow system
// Check latency, error rates, resource utilization
return true;
}
}

Configuration Example

# application-shadow.yml
shadow:
traffic:
enabled: true
sampling-rate: 0.1
live-topic: live-requests
shadow-topic: shadow-requests
excluded-operations:
- "/health"
- "/metrics"
- "/admin/**"
shadow-headers:
X-Shadow-Mode: "true"
X-Shadow-Test-ID: "canary-v2"
max-concurrent-requests: 1000
timeout: 30s
comparison:
strict-byte-comparison: false
ignore-fields:
- "timestamp"
- "id"
- "requestId"
- "generatedAt"

Best Practices

  1. Start Small: Begin with low sampling rates (1-5%) and gradually increase
  2. Monitor Aggressively: Implement comprehensive monitoring and alerting
  3. Exclude Sensitive Operations: Never shadow write operations or sensitive data without safeguards
  4. Resource Isolation: Ensure shadow traffic doesn't impact production systems
  5. Automated Analysis: Build automated comparison and reporting pipelines
  6. Clean Test Data: Implement mechanisms to clean up test data generated by shadow traffic

Conclusion

Shadow traffic testing in Java provides a safe mechanism to validate new system versions with real production traffic patterns. The implementations shown here demonstrate how to build a robust shadow testing framework that can help identify issues before they impact real users, ultimately leading to more reliable deployments and better system quality.

By leveraging these patterns and best practices, teams can confidently test changes in realistic conditions while maintaining system stability and user experience.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper