Correlation in Async Systems in Java: Complete Guide

Correlation ID (also called Trace ID) is a crucial pattern in distributed systems for tracking requests across service boundaries. This guide demonstrates comprehensive correlation ID implementation for asynchronous Java systems.

Why Correlation IDs in Async Systems?

  • Request Tracing: Track requests across service boundaries and async boundaries
  • Debugging: Correlate logs across multiple services and threads
  • Performance Analysis: Measure latency across distributed components
  • Error Tracking: Trace errors back to original requests
  • Audit Trails: Maintain request context for compliance

Prerequisites

  • Java 11+ with CompletableFuture, reactive streams
  • Maven/Gradle for dependency management
  • Logging Framework (SLF4J + Logback/MDC)
  • Async Libraries (Spring WebFlux, Vert.x, etc.)

Step 1: Project Dependencies

Maven (pom.xml):

<dependencies>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Reactive Streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
<!-- Micrometer Tracing -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
<version>1.0.5</version>
</dependency>
<!-- Resilience4j for circuit breaker -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.7.1</version>
</dependency>
<!-- Messaging -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>2.7.0</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>

Step 2: Core Correlation Context Models

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CorrelationContext {
private String correlationId;
private String spanId;
private String parentSpanId;
private String traceId;
private String userId;
private String sessionId;
private String clientId;
private String serviceName;
private Map<String, String> customAttributes;
private Instant startTime;
private String sourceService;
private String destinationService;
public CorrelationContext(String correlationId) {
this.correlationId = correlationId;
this.startTime = Instant.now();
this.customAttributes = new ConcurrentHashMap<>();
}
public void addAttribute(String key, String value) {
if (customAttributes == null) {
customAttributes = new ConcurrentHashMap<>();
}
customAttributes.put(key, value);
}
public String getAttribute(String key) {
return customAttributes != null ? customAttributes.get(key) : null;
}
public CorrelationContext createChildContext() {
return CorrelationContext.builder()
.correlationId(this.correlationId)
.traceId(this.traceId != null ? this.traceId : this.correlationId)
.spanId(generateId())
.parentSpanId(this.spanId)
.userId(this.userId)
.sessionId(this.sessionId)
.clientId(this.clientId)
.serviceName(this.serviceName)
.customAttributes(new ConcurrentHashMap<>(this.customAttributes))
.startTime(Instant.now())
.sourceService(this.serviceName)
.build();
}
private String generateId() {
return UUID.randomUUID().toString().replace("-", "").substring(0, 16);
}
}
public enum CorrelationHeader {
CORRELATION_ID("X-Correlation-ID"),
TRACE_ID("X-Trace-ID"),
SPAN_ID("X-Span-ID"),
PARENT_SPAN_ID("X-Parent-Span-ID"),
USER_ID("X-User-ID"),
SESSION_ID("X-Session-ID"),
CLIENT_ID("X-Client-ID"),
SOURCE_SERVICE("X-Source-Service");
private final String headerName;
CorrelationHeader(String headerName) {
this.headerName = headerName;
}
public String getHeaderName() {
return headerName;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncCorrelationScope implements AutoCloseable {
private CorrelationContext previousContext;
private CorrelationContext currentContext;
private Map<String, String> previousMdc;
@Override
public void close() {
CorrelationManager.restoreContext(previousContext);
if (previousMdc != null) {
restoreMdc(previousMdc);
}
}
private void restoreMdc(Map<String, String> mdc) {
MDC.clear();
if (mdc != null) {
mdc.forEach(MDC::put);
}
}
}

Step 3: Correlation Manager

@Component
@Slf4j
public class CorrelationManager {
private static final ThreadLocal<CorrelationContext> CURRENT_CONTEXT = new ThreadLocal<>();
private static final String CORRELATION_ID_KEY = "correlationId";
private static final String TRACE_ID_KEY = "traceId";
private static final String SPAN_ID_KEY = "spanId";
private final CorrelationIdGenerator idGenerator;
public CorrelationManager(CorrelationIdGenerator idGenerator) {
this.idGenerator = idGenerator;
}
public static CorrelationContext getCurrentContext() {
return CURRENT_CONTEXT.get();
}
public static void setCurrentContext(CorrelationContext context) {
if (context == null) {
clearContext();
} else {
CURRENT_CONTEXT.set(context);
updateMdc(context);
}
}
public static void clearContext() {
CURRENT_CONTEXT.remove();
MDC.clear();
}
public static void restoreContext(CorrelationContext context) {
setCurrentContext(context);
}
public CorrelationContext createContext() {
return createContext(idGenerator.generateCorrelationId());
}
public CorrelationContext createContext(String correlationId) {
CorrelationContext context = new CorrelationContext(correlationId);
context.setTraceId(correlationId);
context.setSpanId(idGenerator.generateSpanId());
context.setStartTime(Instant.now());
return context;
}
public CorrelationContext createContextFromHeaders(HttpHeaders headers) {
String correlationId = extractHeader(headers, CorrelationHeader.CORRELATION_ID);
if (correlationId == null) {
correlationId = idGenerator.generateCorrelationId();
}
return CorrelationContext.builder()
.correlationId(correlationId)
.traceId(extractHeader(headers, CorrelationHeader.TRACE_ID, correlationId))
.spanId(extractHeader(headers, CorrelationHeader.SPAN_ID, idGenerator.generateSpanId()))
.parentSpanId(extractHeader(headers, CorrelationHeader.PARENT_SPAN_ID))
.userId(extractHeader(headers, CorrelationHeader.USER_ID))
.sessionId(extractHeader(headers, CorrelationHeader.SESSION_ID))
.clientId(extractHeader(headers, CorrelationHeader.CLIENT_ID))
.sourceService(extractHeader(headers, CorrelationHeader.SOURCE_SERVICE))
.startTime(Instant.now())
.customAttributes(new ConcurrentHashMap<>())
.build();
}
public AsyncCorrelationScope enterAsyncScope(CorrelationContext context) {
CorrelationContext previousContext = getCurrentContext();
Map<String, String> previousMdc = captureMdc();
setCurrentContext(context);
return AsyncCorrelationScope.builder()
.previousContext(previousContext)
.currentContext(context)
.previousMdc(previousMdc)
.build();
}
public AsyncCorrelationScope enterAsyncScope() {
CorrelationContext context = getCurrentContext();
if (context == null) {
context = createContext();
}
return enterAsyncScope(context);
}
public <T> CompletableFuture<T> withContext(CompletableFuture<T> future, CorrelationContext context) {
return future.whenComplete((result, throwable) -> {
try (AsyncCorrelationScope scope = enterAsyncScope(context)) {
if (throwable != null) {
log.error("Async operation failed with correlation: {}", context.getCorrelationId(), throwable);
} else {
log.debug("Async operation completed with correlation: {}", context.getCorrelationId());
}
}
});
}
public <T> CompletableFuture<T> withCurrentContext(CompletableFuture<T> future) {
CorrelationContext context = getCurrentContext();
if (context == null) {
return future;
}
return withContext(future, context);
}
public <T> Mono<T> withContext(Mono<T> mono, CorrelationContext context) {
return Mono.deferContextual(ctx -> mono)
.contextWrite(ctx -> populateReactorContext(ctx, context))
.doOnEach(signal -> {
try (AsyncCorrelationScope scope = enterAsyncScope(context)) {
if (signal.isOnError()) {
log.error("Reactive operation failed with correlation: {}", 
context.getCorrelationId(), signal.getThrowable());
} else if (signal.isOnComplete()) {
log.debug("Reactive operation completed with correlation: {}", 
context.getCorrelationId());
}
}
});
}
public <T> Mono<T> withCurrentContext(Mono<T> mono) {
CorrelationContext context = getCurrentContext();
if (context == null) {
return mono;
}
return withContext(mono, context);
}
public <T> Flux<T> withContext(Flux<T> flux, CorrelationContext context) {
return Flux.deferContextual(ctx -> flux)
.contextWrite(ctx -> populateReactorContext(ctx, context))
.doOnEach(signal -> {
try (AsyncCorrelationScope scope = enterAsyncScope(context)) {
if (signal.isOnError()) {
log.error("Reactive stream failed with correlation: {}", 
context.getCorrelationId(), signal.getThrowable());
} else if (signal.isOnComplete()) {
log.debug("Reactive stream completed with correlation: {}", 
context.getCorrelationId());
}
}
});
}
private Context populateReactorContext(Context reactorContext, CorrelationContext correlationContext) {
return reactorContext.put(CorrelationContext.class, correlationContext)
.put(CORRELATION_ID_KEY, correlationContext.getCorrelationId())
.put(TRACE_ID_KEY, correlationContext.getTraceId())
.put(SPAN_ID_KEY, correlationContext.getSpanId());
}
private static void updateMdc(CorrelationContext context) {
MDC.clear();
if (context != null) {
MDC.put(CORRELATION_ID_KEY, context.getCorrelationId());
MDC.put(TRACE_ID_KEY, context.getTraceId());
MDC.put(SPAN_ID_KEY, context.getSpanId());
if (context.getUserId() != null) {
MDC.put("userId", context.getUserId());
}
if (context.getServiceName() != null) {
MDC.put("serviceName", context.getServiceName());
}
if (context.getCustomAttributes() != null) {
context.getCustomAttributes().forEach(MDC::put);
}
}
}
private Map<String, String> captureMdc() {
return MDC.getCopyOfContextMap();
}
private String extractHeader(HttpHeaders headers, CorrelationHeader header) {
return headers.getFirst(header.getHeaderName());
}
private String extractHeader(HttpHeaders headers, CorrelationHeader header, String defaultValue) {
String value = extractHeader(headers, header);
return value != null ? value : defaultValue;
}
public Map<String, String> createHeaders(CorrelationContext context) {
Map<String, String> headers = new HashMap<>();
if (context != null) {
headers.put(CorrelationHeader.CORRELATION_ID.getHeaderName(), context.getCorrelationId());
headers.put(CorrelationHeader.TRACE_ID.getHeaderName(), context.getTraceId());
headers.put(CorrelationHeader.SPAN_ID.getHeaderName(), context.getSpanId());
if (context.getParentSpanId() != null) {
headers.put(CorrelationHeader.PARENT_SPAN_ID.getHeaderName(), context.getParentSpanId());
}
if (context.getUserId() != null) {
headers.put(CorrelationHeader.USER_ID.getHeaderName(), context.getUserId());
}
if (context.getSourceService() != null) {
headers.put(CorrelationHeader.SOURCE_SERVICE.getHeaderName(), context.getSourceService());
}
}
return headers;
}
}
@Component
@Slf4j
public class CorrelationIdGenerator {
public String generateCorrelationId() {
return UUID.randomUUID().toString().replace("-", "").substring(0, 16);
}
public String generateSpanId() {
return Long.toHexString(System.currentTimeMillis()) + 
Integer.toHexString(ThreadLocalRandom.current().nextInt());
}
public String generateTraceId() {
return generateCorrelationId();
}
}

Step 4: Web Filter for HTTP Correlation

@Component
@Slf4j
public class CorrelationFilter implements Filter {
private final CorrelationManager correlationManager;
public CorrelationFilter(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Extract or create correlation context
CorrelationContext context = correlationManager.createContextFromHeaders(
new ServletServerHttpRequest(httpRequest).getHeaders());
context.setServiceName("web-service");
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
// Add correlation ID to response headers
correlationManager.createHeaders(context).forEach((key, value) -> 
httpResponse.setHeader(key, value));
log.info("Incoming request: {} {} with correlation: {}", 
httpRequest.getMethod(), httpRequest.getRequestURI(), context.getCorrelationId());
chain.doFilter(request, response);
log.info("Request completed: {} {} with correlation: {} - Status: {}", 
httpRequest.getMethod(), httpRequest.getRequestURI(), 
context.getCorrelationId(), httpResponse.getStatus());
} finally {
CorrelationManager.clearContext();
}
}
}
// Spring WebFlux equivalent
@Component
@Slf4j
public class CorrelationWebFilter implements WebFilter {
private final CorrelationManager correlationManager;
public CorrelationWebFilter(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// Create correlation context from headers
CorrelationContext context = correlationManager.createContextFromHeaders(request.getHeaders());
context.setServiceName("webflux-service");
// Store in Reactor context
return chain.filter(exchange)
.contextWrite(ctx -> correlationManager.populateReactorContext(ctx, context))
.doOnSubscribe(subscription -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
log.info("Incoming reactive request: {} {} with correlation: {}", 
request.getMethod(), request.getPath(), context.getCorrelationId());
}
})
.doOnTerminate(() -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
log.info("Reactive request completed: {} {} with correlation: {}", 
request.getMethod(), request.getPath(), context.getCorrelationId());
}
});
}
}

Step 5: Async Task Execution with Correlation

@Component
@Slf4j
public class CorrelationAwareExecutor {
private final ThreadPoolTaskExecutor executor;
private final CorrelationManager correlationManager;
public CorrelationAwareExecutor(CorrelationManager correlationManager, 
@Value("${async.executor.pool-size:10}") int poolSize) {
this.correlationManager = correlationManager;
this.executor = createExecutor(poolSize);
}
public CompletableFuture<Void> executeAsync(Runnable task) {
CorrelationContext currentContext = CorrelationManager.getCurrentContext();
return CompletableFuture.runAsync(() -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(currentContext)) {
log.debug("Executing async task with correlation: {}", currentContext.getCorrelationId());
task.run();
} catch (Exception e) {
log.error("Async task failed with correlation: {}", currentContext.getCorrelationId(), e);
throw e;
}
}, executor);
}
public <T> CompletableFuture<T> executeAsync(Supplier<T> task) {
CorrelationContext currentContext = CorrelationManager.getCurrentContext();
return CompletableFuture.supplyAsync(() -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(currentContext)) {
log.debug("Executing async supplier with correlation: {}", currentContext.getCorrelationId());
return task.get();
} catch (Exception e) {
log.error("Async supplier failed with correlation: {}", currentContext.getCorrelationId(), e);
throw e;
}
}, executor);
}
public <T> CompletableFuture<List<T>> executeAllAsync(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(this::executeAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
public ScheduledFuture<?> scheduleWithCorrelation(Runnable task, long delay, TimeUnit unit) {
CorrelationContext currentContext = CorrelationManager.getCurrentContext();
return executor.getScheduledExecutor().schedule(() -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(currentContext)) {
log.debug("Executing scheduled task with correlation: {}", currentContext.getCorrelationId());
task.run();
} catch (Exception e) {
log.error("Scheduled task failed with correlation: {}", currentContext.getCorrelationId(), e);
}
}, delay, unit);
}
private ThreadPoolTaskExecutor createExecutor(int poolSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize * 2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("correlation-aware-");
executor.setTaskDecorator(new CorrelationAwareTaskDecorator(correlationManager));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
@PreDestroy
public void shutdown() {
executor.shutdown();
}
}
@Slf4j
public class CorrelationAwareTaskDecorator implements TaskDecorator {
private final CorrelationManager correlationManager;
public CorrelationAwareTaskDecorator(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
@Override
public Runnable decorate(Runnable runnable) {
CorrelationContext parentContext = CorrelationManager.getCurrentContext();
return () -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(parentContext)) {
log.debug("Executing decorated task with correlation: {}", 
parentContext != null ? parentContext.getCorrelationId() : "none");
runnable.run();
} catch (Exception e) {
log.error("Decorated task failed", e);
throw e;
}
};
}
}

Step 6: Reactive Streams Correlation Support

@Component
@Slf4j
public class ReactiveCorrelationSupport {
private final CorrelationManager correlationManager;
public ReactiveCorrelationSupport(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
public <T> Function<Mono<T>, Mono<T>> correlateMono() {
return mono -> {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
return mono;
}
return correlationManager.withContext(mono, context);
};
}
public <T> Function<Flux<T>, Flux<T>> correlateFlux() {
return flux -> {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
return flux;
}
return correlationManager.withContext(flux, context);
};
}
public <T> Mono<T> withNewSpan(Supplier<Mono<T>> operation, String operationName) {
CorrelationContext parentContext = CorrelationManager.getCurrentContext();
CorrelationContext childContext = parentContext != null ? 
parentContext.createChildContext() : 
correlationManager.createContext();
childContext.setServiceName(operationName);
return Mono.defer(() -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(childContext)) {
log.info("Starting reactive operation: {} with correlation: {}", 
operationName, childContext.getCorrelationId());
return operation.get();
}
}).doOnSuccess(result -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(childContext)) {
log.info("Reactive operation completed: {} with correlation: {}", 
operationName, childContext.getCorrelationId());
}
}).doOnError(error -> {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(childContext)) {
log.error("Reactive operation failed: {} with correlation: {}", 
operationName, childContext.getCorrelationId(), error);
}
});
}
public <T> Flux<T> parallelWithCorrelation(Flux<T> flux, int parallelism, Function<T, Mono<?>> processor) {
return flux.flatMap(item -> {
CorrelationContext context = CorrelationManager.getCurrentContext();
return Mono.deferContextual(reactorContext -> {
CorrelationContext itemContext = reactorContext.getOrDefault(
CorrelationContext.class, context);
if (itemContext != null) {
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(itemContext)) {
log.debug("Processing parallel item with correlation: {}", 
itemContext.getCorrelationId());
return processor.apply(item);
}
} else {
return processor.apply(item);
}
});
}, parallelism);
}
}
// Reactor Context Helper
@Component
@Slf4j
public class ReactorContextHelper {
public static final String CORRELATION_CONTEXT_KEY = "correlationContext";
public static Mono<CorrelationContext> getCorrelationContext() {
return Mono.deferContextual(ctx -> 
Mono.justOrEmpty(ctx.getOrEmpty(CORRELATION_CONTEXT_KEY))
.cast(CorrelationContext.class)
);
}
public static <T> Function<Mono<T>, Mono<T>> withCorrelationContext(CorrelationContext context) {
return mono -> mono.contextWrite(ctx -> 
ctx.put(CORRELATION_CONTEXT_KEY, context)
);
}
public static <T> Mono<T> runWithContext(Supplier<Mono<T>> operation, CorrelationContext context) {
return Mono.deferContextual(reactorContext -> {
try (AsyncCorrelationScope scope = createScopeFromReactor(reactorContext, context)) {
return operation.get();
}
}).contextWrite(ctx -> ctx.put(CORRELATION_CONTEXT_KEY, context));
}
private static AsyncCorrelationScope createScopeFromReactor(
reactor.util.context.Context reactorContext, 
CorrelationContext fallbackContext) {
CorrelationContext context = reactorContext.getOrDefault(
CORRELATION_CONTEXT_KEY, fallbackContext);
CorrelationManager correlationManager = getCorrelationManager();
return correlationManager.enterAsyncScope(context);
}
private static CorrelationManager getCorrelationManager() {
// This would typically be injected, simplified for example
return ApplicationContextHolder.getBean(CorrelationManager.class);
}
}

Step 7: Messaging Correlation Support

@Component
@Slf4j
public class MessagingCorrelationSupport {
private final CorrelationManager correlationManager;
private final ObjectMapper objectMapper;
public MessagingCorrelationSupport(CorrelationManager correlationManager, ObjectMapper objectMapper) {
this.correlationManager = correlationManager;
this.objectMapper = objectMapper;
}
// Kafka Support
public <T> ProducerRecord<String, T> correlateKafkaMessage(String topic, T payload) {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
context = correlationManager.createContext();
}
ProducerRecord<String, T> record = new ProducerRecord<>(topic, payload);
correlationManager.createHeaders(context).forEach((key, value) -> 
record.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
log.debug("Created Kafka message for topic: {} with correlation: {}", 
topic, context.getCorrelationId());
return record;
}
public <T> ConsumerRecord<String, T> processKafkaMessage(ConsumerRecord<String, T> record) {
Map<String, String> headers = extractHeaders(record.headers());
CorrelationContext context = createContextFromHeaders(headers);
context.setServiceName("kafka-consumer");
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
log.info("Processing Kafka message from topic: {} with correlation: {}", 
record.topic(), context.getCorrelationId());
return record;
}
}
// RabbitMQ Support
public Message correlateRabbitMessage(Object payload, MessageProperties properties) {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
context = correlationManager.createContext();
}
Map<String, String> headers = correlationManager.createHeaders(context);
headers.forEach(properties::setHeader);
log.debug("Created RabbitMQ message with correlation: {}", context.getCorrelationId());
try {
byte[] body = objectMapper.writeValueAsBytes(payload);
return new Message(body, properties);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize message payload", e);
}
}
public <T> T processRabbitMessage(Message message, Class<T> payloadType) {
Map<String, String> headers = extractHeaders(message.getMessageProperties().getHeaders());
CorrelationContext context = createContextFromHeaders(headers);
context.setServiceName("rabbit-consumer");
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
log.info("Processing RabbitMQ message with correlation: {}", context.getCorrelationId());
try {
return objectMapper.readValue(message.getBody(), payloadType);
} catch (IOException e) {
log.error("Failed to deserialize RabbitMQ message with correlation: {}", 
context.getCorrelationId(), e);
throw new RuntimeException("Message deserialization failed", e);
}
}
}
// Spring Cloud Stream Support
public <T> Message<T> correlateStreamMessage(T payload) {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
context = correlationManager.createContext();
}
Map<String, String> headers = correlationManager.createHeaders(context);
log.debug("Created stream message with correlation: {}", context.getCorrelationId());
return MessageBuilder.withPayload(payload)
.copyHeaders(headers)
.build();
}
public <T> T processStreamMessage(Message<T> message) {
Map<String, String> headers = extractHeaders(message.getHeaders());
CorrelationContext context = createContextFromHeaders(headers);
try (AsyncCorrelationScope scope = correlationManager.enterAsyncScope(context)) {
log.info("Processing stream message with correlation: {}", context.getCorrelationId());
return message.getPayload();
}
}
private Map<String, String> extractHeaders(org.apache.kafka.common.header.Headers kafkaHeaders) {
Map<String, String> headers = new HashMap<>();
kafkaHeaders.forEach(header -> 
headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)));
return headers;
}
private Map<String, String> extractHeaders(Map<String, Object> springHeaders) {
Map<String, String> headers = new HashMap<>();
springHeaders.forEach((key, value) -> {
if (value instanceof String) {
headers.put(key, (String) value);
} else if (value instanceof byte[]) {
headers.put(key, new String((byte[]) value, StandardCharsets.UTF_8));
}
});
return headers;
}
private CorrelationContext createContextFromHeaders(Map<String, String> headers) {
String correlationId = headers.get(CorrelationHeader.CORRELATION_ID.getHeaderName());
if (correlationId == null) {
correlationId = new CorrelationIdGenerator().generateCorrelationId();
}
return CorrelationContext.builder()
.correlationId(correlationId)
.traceId(headers.get(CorrelationHeader.TRACE_ID.getHeaderName()))
.spanId(headers.get(CorrelationHeader.SPAN_ID.getHeaderName()))
.parentSpanId(headers.get(CorrelationHeader.PARENT_SPAN_ID.getHeaderName()))
.userId(headers.get(CorrelationHeader.USER_ID.getHeaderName()))
.sourceService(headers.get(CorrelationHeader.SOURCE_SERVICE.getHeaderName()))
.startTime(Instant.now())
.customAttributes(new ConcurrentHashMap<>())
.build();
}
}

Step 8: REST Client Correlation Support

@Component
@Slf4j
public class CorrelationAwareRestClient {
private final RestTemplate restTemplate;
private final WebClient webClient;
private final CorrelationManager correlationManager;
public CorrelationAwareRestClient(CorrelationManager correlationManager, 
RestTemplate restTemplate,
WebClient.Builder webClientBuilder) {
this.correlationManager = correlationManager;
this.restTemplate = configureRestTemplate(restTemplate);
this.webClient = configureWebClient(webClientBuilder);
}
// Synchronous REST calls
public <T> ResponseEntity<T> exchangeWithCorrelation(String url, HttpMethod method, 
HttpEntity<?> requestEntity, 
Class<T> responseType, 
Object... uriVariables) {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context != null) {
// Add correlation headers to request
HttpHeaders headers = new HttpHeaders();
headers.putAll(requestEntity.getHeaders());
correlationManager.createHeaders(context).forEach(headers::set);
requestEntity = new HttpEntity<>(requestEntity.getBody(), headers);
log.debug("Making REST call to: {} with correlation: {}", url, context.getCorrelationId());
}
return restTemplate.exchange(url, method, requestEntity, responseType, uriVariables);
}
public <T> T getForObjectWithCorrelation(String url, Class<T> responseType, Object... uriVariables) {
HttpEntity<Void> requestEntity = createRequestEntityWithCorrelation();
ResponseEntity<T> response = exchangeWithCorrelation(url, HttpMethod.GET, 
requestEntity, responseType, uriVariables);
return response.getBody();
}
// Reactive REST calls
public <T> Mono<ResponseEntity<T>> exchangeWithCorrelation(String url, HttpMethod method, 
WebClient.RequestBodySpec requestSpec, 
Class<T> responseType) {
return ReactorContextHelper.getCorrelationContext()
.flatMap(context -> {
WebClient.RequestBodySpec correlatedSpec = requestSpec;
// Add correlation headers
Map<String, String> headers = correlationManager.createHeaders(context);
headers.forEach(correlatedSpec::header);
log.debug("Making reactive REST call to: {} with correlation: {}", 
url, context.getCorrelationId());
return correlatedSpec.retrieve()
.toEntity(responseType)
.doOnSuccess(response -> 
log.debug("Reactive REST call completed: {} with correlation: {}", 
url, context.getCorrelationId()))
.doOnError(error -> 
log.error("Reactive REST call failed: {} with correlation: {}", 
url, context.getCorrelationId(), error));
})
.switchIfEmpty(Mono.defer(() -> {
log.warn("No correlation context for REST call to: {}", url);
return requestSpec.retrieve().toEntity(responseType);
}));
}
public <T> Mono<T> getWithCorrelation(String url, Class<T> responseType) {
return exchangeWithCorrelation(url, HttpMethod.GET, 
webClient.get().uri(url), responseType)
.map(ResponseEntity::getBody);
}
private HttpEntity<Void> createRequestEntityWithCorrelation() {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context == null) {
return HttpEntity.EMPTY;
}
HttpHeaders headers = new HttpHeaders();
correlationManager.createHeaders(context).forEach(headers::set);
return new HttpEntity<>(headers);
}
private RestTemplate configureRestTemplate(RestTemplate restTemplate) {
// Add correlation interceptor
restTemplate.getInterceptors().add(new CorrelationClientHttpRequestInterceptor(correlationManager));
return restTemplate;
}
private WebClient configureWebClient(WebClient.Builder webClientBuilder) {
return webClientBuilder
.filter(new CorrelationExchangeFilterFunction(correlationManager))
.build();
}
}
@Slf4j
public class CorrelationClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
private final CorrelationManager correlationManager;
public CorrelationClientHttpRequestInterceptor(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException {
CorrelationContext context = CorrelationManager.getCurrentContext();
if (context != null) {
// Add correlation headers
correlationManager.createHeaders(context).forEach(request.getHeaders()::set);
log.debug("Outgoing REST call: {} {} with correlation: {}", 
request.getMethod(), request.getURI(), context.getCorrelationId());
}
ClientHttpResponse response = execution.execute(request, body);
if (context != null) {
log.debug("REST call completed: {} {} with correlation: {} - Status: {}", 
request.getMethod(), request.getURI(), 
context.getCorrelationId(), response.getStatusCode());
}
return response;
}
}
@Slf4j
public class CorrelationExchangeFilterFunction implements ExchangeFilterFunction {
private final CorrelationManager correlationManager;
public CorrelationExchangeFilterFunction(CorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return ReactorContextHelper.getCorrelationContext()
.map(context -> {
// Add correlation headers
ClientRequest correlatedRequest = ClientRequest.from(request)
.headers(headers -> correlationManager.createHeaders(context).forEach(headers::set))
.build();
log.debug("Outgoing reactive call: {} {} with correlation: {}", 
request.method(), request.url(), context.getCorrelationId());
return next.exchange(correlatedRequest)
.doOnSuccess(response -> 
log.debug("Reactive call completed: {} {} with correlation: {} - Status: {}", 
request.method(), request.url(), 
context.getCorrelationId(), response.statusCode()))
.doOnError(error -> 
log.error("Reactive call failed: {} {} with correlation: {}", 
request.method(), request.url(), 
context.getCorrelationId(), error));
})
.switchIfEmpty(Mono.defer(() -> {
log.warn("No correlation context for reactive call: {} {}", 
request.method(), request.url());
return next.exchange(request);
}))
.flatMap(Mono::just);
}
}

Step 9: Usage Examples and Testing

@Service
@Slf4j
public class OrderProcessingService {
private final CorrelationAwareExecutor asyncExecutor;
private final ReactiveCorrelationSupport reactiveSupport;
private final CorrelationAwareRestClient restClient;
private final MessagingCorrelationSupport messagingSupport;
public OrderProcessingService(CorrelationAwareExecutor asyncExecutor,
ReactiveCorrelationSupport reactiveSupport,
CorrelationAwareRestClient restClient,
MessagingCorrelationSupport messagingSupport) {
this.asyncExecutor = asyncExecutor;
this.reactiveSupport = reactiveSupport;
this.restClient = restClient;
this.messagingSupport = messagingSupport;
}
@Async
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
log.info("Processing order asynchronously: {}", order.getId());
// Chain async operations with correlation
return asyncExecutor.executeAsync(() -> validateOrder(order))
.thenComposeAsync(validatedOrder -> 
asyncExecutor.executeAsync(() -> processPayment(validatedOrder)))
.thenComposeAsync(paymentResult -> 
asyncExecutor.executeAsync(() -> updateInventory(paymentResult)))
.thenApplyAsync(this::createOrderResult);
}
public Mono<OrderResult> processOrderReactive(Order order) {
return reactiveSupport.withNewSpan(() -> 
validateOrderReactive(order)
.flatMap(this::processPaymentReactive)
.flatMap(this::updateInventoryReactive)
.map(this::createOrderResult),
"processOrder"
);
}
public void processOrderWithMessaging(Order order) {
// Send correlated message
ProducerRecord<String, Order> message = 
messagingSupport.correlateKafkaMessage("orders", order);
// In real implementation, this would be sent via KafkaTemplate
log.info("Sent order to Kafka: {} with correlation: {}", 
order.getId(), CorrelationManager.getCurrentContext().getCorrelationId());
}
private Order validateOrder(Order order) {
log.debug("Validating order: {}", order.getId());
// Validation logic
return order;
}
private PaymentResult processPayment(Order order) {
log.debug("Processing payment for order: {}", order.getId());
// Make external call with correlation
PaymentRequest paymentRequest = new PaymentRequest(order);
PaymentResponse response = restClient.postForObjectWithCorrelation(
"https://payment-service/api/payments", 
paymentRequest, PaymentResponse.class);
return new PaymentResult(order, response);
}
private InventoryUpdate updateInventory(PaymentResult paymentResult) {
log.debug("Updating inventory for order: {}", paymentResult.getOrder().getId());
// Inventory update logic
return new InventoryUpdate(paymentResult.getOrder());
}
private OrderResult createOrderResult(InventoryUpdate inventoryUpdate) {
return OrderResult.success(inventoryUpdate.getOrder());
}
private Mono<Order> validateOrderReactive(Order order) {
return Mono.fromCallable(() -> {
log.debug("Reactive order validation: {}", order.getId());
return order;
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<PaymentResult> processPaymentReactive(Order order) {
return restClient.getWithCorrelation(
"https://payment-service/api/payments/validate/" + order.getId(), 
PaymentResponse.class)
.map(response -> new PaymentResult(order, response));
}
private Mono<InventoryUpdate> updateInventoryReactive(PaymentResult paymentResult) {
return Mono.fromCallable(() -> {
log.debug("Reactive inventory update: {}", paymentResult.getOrder().getId());
return new InventoryUpdate(paymentResult.getOrder());
}).subscribeOn(Schedulers.boundedElastic());
}
}
// Test Class
@SpringBootTest
@Slf4j
class CorrelationTest {
@Autowired
private CorrelationManager correlationManager;
@Autowired
private CorrelationAwareExecutor asyncExecutor;
@Test
void testAsyncCorrelation() throws Exception {
// Setup correlation context
CorrelationContext context = correlationManager.createContext();
correlationManager.setCurrentContext(context);
// Execute async task
CompletableFuture<String> future = asyncExecutor.executeAsync(() -> {
log.info("Executing in async context");
return "result";
});
String result = future.get(5, TimeUnit.SECONDS);
assertEquals("result", result);
}
@Test
void testReactiveCorrelation() {
CorrelationContext context = correlationManager.createContext();
Mono<String> result = ReactorContextHelper.runWithContext(() -> 
Mono.fromCallable(() -> {
log.info("Executing in reactive context");
return "reactive-result";
}), context);
StepVerifier.create(result)
.expectNext("reactive-result")
.verifyComplete();
}
}

Step 10: Configuration

application.yml:

correlation:
enabled: true
header-names:
correlation-id: "X-Correlation-ID"
trace-id: "X-Trace-ID"
span-id: "X-Span-ID"
mdc:
enabled: true
fields:
- "correlationId"
- "traceId"
- "spanId"
- "userId"
- "serviceName"
async:
executor:
pool-size: 10
max-pool-size: 20
queue-capacity: 100
logging:
pattern:
level: "%5p [%X{correlationId:-none}] [%X{serviceName:-unknown}] %m%n"

Key Features Implemented

  1. Thread-local Context Management: Proper context propagation across threads
  2. Async Task Support: CompletableFuture and executor service integration
  3. Reactive Streams Support: Mono/Flux context propagation
  4. HTTP Correlation: Request/response header handling
  5. Messaging Support: Kafka, RabbitMQ, and Spring Cloud Stream integration
  6. REST Client Support: Synchronous and reactive HTTP clients
  7. MDC Integration: Logging context propagation
  8. Error Handling: Proper context restoration in error scenarios

Best Practices

  1. Always Clear Context: Ensure context is cleared after request processing
  2. Context Propagation: Propagate context across all async boundaries
  3. Error Handling: Maintain context in error scenarios for debugging
  4. Performance: Use efficient data structures for context storage
  5. Testing: Comprehensive testing of context propagation scenarios
  6. Monitoring: Log correlation IDs for all cross-service calls
  7. Security: Be cautious about propagating sensitive information

This comprehensive correlation implementation provides a robust foundation for tracking requests across asynchronous boundaries in Java applications, enabling effective debugging, monitoring, and analysis of distributed systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper