Correlation IDs (also called trace IDs or request IDs) are essential for tracking requests across distributed systems, microservices, and asynchronous processing. They help correlate logs, metrics, and traces for a single business transaction.
Table of Contents
- Basic Correlation ID Implementation
- ThreadLocal-Based Context Management
- MDC (Mapped Diagnostic Context) Integration
- Spring Boot Integration
- Web Filter for HTTP Requests
- Async and Reactive Support
- Database and Messaging Integration
- Distributed Tracing
1. Basic Correlation ID Implementation
Example 1: Foundation Correlation ID Manager
import java.util.UUID;
import java.util.concurrent.Callable;
public class CorrelationId {
private static final ThreadLocal<String> currentCorrelationId = new ThreadLocal<>();
private CorrelationId() {
// Utility class
}
public static String generateCorrelationId() {
return "corr-" + UUID.randomUUID().toString().substring(0, 8);
}
public static String getCurrentCorrelationId() {
return currentCorrelationId.get();
}
public static void setCorrelationId(String correlationId) {
currentCorrelationId.set(correlationId);
}
public static void clear() {
currentCorrelationId.remove();
}
public static void withCorrelationId(String correlationId, Runnable task) {
String previousId = getCurrentCorrelationId();
try {
setCorrelationId(correlationId);
task.run();
} finally {
if (previousId != null) {
setCorrelationId(previousId);
} else {
clear();
}
}
}
public static <T> T withCorrelationId(String correlationId, Callable<T> task) throws Exception {
String previousId = getCurrentCorrelationId();
try {
setCorrelationId(correlationId);
return task.call();
} finally {
if (previousId != null) {
setCorrelationId(previousId);
} else {
clear();
}
}
}
// Auto-generating version
public static void withNewCorrelationId(Runnable task) {
withCorrelationId(generateCorrelationId(), task);
}
public static <T> T withNewCorrelationId(Callable<T> task) throws Exception {
return withCorrelationId(generateCorrelationId(), task);
}
}
// Usage examples
public class CorrelationIdDemo {
public static void main(String[] args) {
// Basic usage
CorrelationId.setCorrelationId("test-123");
System.out.println("Current correlation ID: " + CorrelationId.getCurrentCorrelationId());
// Scoped usage
CorrelationId.withCorrelationId("scoped-456", () -> {
System.out.println("Inside scope: " + CorrelationId.getCurrentCorrelationId());
performBusinessOperation();
});
// Auto-generated ID
CorrelationId.withNewCorrelationId(() -> {
System.out.println("Auto-generated: " + CorrelationId.getCurrentCorrelationId());
});
CorrelationId.clear();
}
private static void performBusinessOperation() {
System.out.println("Operation with correlation: " + CorrelationId.getCurrentCorrelationId());
}
}
2. ThreadLocal-Based Context Management
Example 2: Comprehensive Request Context
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class RequestContext {
private static final ThreadLocal<RequestContext> currentContext = new ThreadLocal<>();
private final String correlationId;
private final String userId;
private final String sessionId;
private final Map<String, String> customAttributes;
private final long startTime;
private RequestContext(Builder builder) {
this.correlationId = builder.correlationId;
this.userId = builder.userId;
this.sessionId = builder.sessionId;
this.customAttributes = new ConcurrentHashMap<>(builder.customAttributes);
this.startTime = System.currentTimeMillis();
}
public static class Builder {
private String correlationId;
private String userId;
private String sessionId;
private Map<String, String> customAttributes = new HashMap<>();
public Builder correlationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
public Builder userId(String userId) {
this.userId = userId;
return this;
}
public Builder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
public Builder attribute(String key, String value) {
this.customAttributes.put(key, value);
return this;
}
public RequestContext build() {
if (correlationId == null) {
correlationId = generateCorrelationId();
}
return new RequestContext(this);
}
}
// Getters
public String getCorrelationId() { return correlationId; }
public String getUserId() { return userId; }
public String getSessionId() { return sessionId; }
public Map<String, String> getCustomAttributes() {
return Collections.unmodifiableMap(customAttributes);
}
public long getStartTime() { return startTime; }
public long getDuration() { return System.currentTimeMillis() - startTime; }
public String getAttribute(String key) {
return customAttributes.get(key);
}
public void setAttribute(String key, String value) {
customAttributes.put(key, value);
}
// Static context management
public static void setCurrentContext(RequestContext context) {
currentContext.set(context);
}
public static RequestContext getCurrentContext() {
return currentContext.get();
}
public static String getCurrentCorrelationId() {
RequestContext context = getCurrentContext();
return context != null ? context.getCorrelationId() : null;
}
public static void clearContext() {
currentContext.remove();
}
public static <T> T withContext(RequestContext context, Callable<T> task) throws Exception {
RequestContext previous = getCurrentContext();
try {
setCurrentContext(context);
return task.call();
} finally {
if (previous != null) {
setCurrentContext(previous);
} else {
clearContext();
}
}
}
public static void withContext(RequestContext context, Runnable task) {
RequestContext previous = getCurrentContext();
try {
setCurrentContext(context);
task.run();
} finally {
if (previous != null) {
setCurrentContext(previous);
} else {
clearContext();
}
}
}
private static String generateCorrelationId() {
return "req-" + UUID.randomUUID().toString().substring(0, 12);
}
@Override
public String toString() {
return String.format(
"RequestContext{correlationId='%s', userId='%s', sessionId='%s', attributes=%s}",
correlationId, userId, sessionId, customAttributes);
}
}
// Usage example
public class RequestContextDemo {
public static void main(String[] args) throws Exception {
// Create a request context
RequestContext context = new RequestContext.Builder()
.correlationId("corr-123456")
.userId("user-789")
.sessionId("session-abc")
.attribute("client-ip", "192.168.1.100")
.attribute("user-agent", "Mozilla/5.0")
.build();
// Execute with context
RequestContext.withContext(context, () -> {
processOrder("order-001", 99.99);
sendNotification("user-789", "Order processed");
});
// Verify context is cleared
System.out.println("After execution: " + RequestContext.getCurrentContext());
}
private static void processOrder(String orderId, double amount) {
RequestContext context = RequestContext.getCurrentContext();
System.out.printf("Processing order %s for user %s with correlation %s%n",
orderId, context.getUserId(), context.getCorrelationId());
// Log with correlation ID
logInfo("Order processing started", Map.of("orderId", orderId, "amount", String.valueOf(amount)));
}
private static void sendNotification(String userId, String message) {
RequestContext context = RequestContext.getCurrentContext();
logInfo("Sending notification",
Map.of("userId", userId, "message", message, "correlationId", context.getCorrelationId()));
}
private static void logInfo(String message, Map<String, String> context) {
String correlationId = RequestContext.getCurrentCorrelationId();
System.out.printf("[%s] %s - %s%n", correlationId, message, context);
}
}
3. MDC (Mapped Diagnostic Context) Integration
Example 3: SLF4J MDC Integration
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
public class MDCCorrelationManager {
private static final Logger logger = LoggerFactory.getLogger(MDCCorrelationManager.class);
// MDC Keys
public static final String CORRELATION_ID = "correlationId";
public static final String USER_ID = "userId";
public static final String SESSION_ID = "sessionId";
public static final String REQUEST_PATH = "requestPath";
public static final String CLIENT_IP = "clientIp";
public static void setupContext(String correlationId, String userId, String sessionId) {
MDC.put(CORRELATION_ID, correlationId);
if (userId != null) MDC.put(USER_ID, userId);
if (sessionId != null) MDC.put(SESSION_ID, sessionId);
}
public static void setupContext(Map<String, String> context) {
context.forEach(MDC::put);
}
public static void setCorrelationId(String correlationId) {
MDC.put(CORRELATION_ID, correlationId);
}
public static String getCorrelationId() {
return MDC.get(CORRELATION_ID);
}
public static void clear() {
MDC.clear();
}
public static Map<String, String> getCopyOfContextMap() {
return MDC.getCopyOfContextMap();
}
public static void setContextMap(Map<String, String> context) {
MDC.setContextMap(context);
}
// Wrapper for Runnable that preserves MDC context
public static Runnable wrap(Runnable task) {
Map<String, String> context = getCopyOfContextMap();
return () -> {
Map<String, String> previous = getCopyOfContextMap();
try {
setContextMap(context);
task.run();
} finally {
setContextMap(previous);
}
};
}
// Wrapper for Callable that preserves MDC context
public static <T> Callable<T> wrap(Callable<T> task) {
Map<String, String> context = getCopyOfContextMap();
return () -> {
Map<String, String> previous = getCopyOfContextMap();
try {
setContextMap(context);
return task.call();
} finally {
setContextMap(previous);
}
};
}
// MDC-aware executor service
public static ExecutorService newMDCAwareExecutorService(ExecutorService delegate) {
return new MDCAwareExecutorService(delegate);
}
public static ScheduledExecutorService newMDCAwareScheduledExecutorService(int corePoolSize) {
return new MDCAwareScheduledThreadPoolExecutor(corePoolSize);
}
// MDC-aware thread pool executor
private static class MDCAwareExecutorService implements ExecutorService {
private final ExecutorService delegate;
public MDCAwareExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}
// Delegate all other methods
@Override public void shutdown() { delegate.shutdown(); }
@Override public List<Runnable> shutdownNow() { return delegate.shutdownNow(); }
@Override public boolean isShutdown() { return delegate.isShutdown(); }
@Override public boolean isTerminated() { return delegate.isTerminated(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { return delegate.awaitTermination(timeout, unit); }
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapCollection(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
}
private <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
Collection<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(wrap(task));
}
return wrapped;
}
}
// MDC-aware scheduled executor
private static class MDCAwareScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public MDCAwareScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
@Override
public void execute(Runnable command) {
super.execute(MDCCorrelationManager.wrap(command));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(MDCCorrelationManager.wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(MDCCorrelationManager.wrap(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(MDCCorrelationManager.wrap(task));
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return super.schedule(MDCCorrelationManager.wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return super.schedule(MDCCorrelationManager.wrap(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
return super.scheduleAtFixedRate(MDCCorrelationManager.wrap(command),
initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
return super.scheduleWithFixedDelay(MDCCorrelationManager.wrap(command),
initialDelay, delay, unit);
}
}
}
// Service using MDC correlation
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
private final ExecutorService executorService;
public OrderService() {
this.executorService = MDCCorrelationManager.newMDCAwareExecutorService(
Executors.newFixedThreadPool(4)
);
}
public void processOrderAsync(Order order) {
logger.info("Processing order asynchronously: {}", order.getId());
executorService.submit(() -> {
// This will have the MDC context from the calling thread
try {
processOrder(order);
logger.info("Order processed successfully: {}", order.getId());
} catch (Exception e) {
logger.error("Failed to process order: {}", order.getId(), e);
}
});
}
private void processOrder(Order order) {
String correlationId = MDCCorrelationManager.getCorrelationId();
logger.info("Starting order processing with correlation: {}", correlationId);
// Simulate processing steps
validateOrder(order);
processPayment(order);
updateInventory(order);
logger.info("Order processing completed: {}", order.getId());
}
private void validateOrder(Order order) {
logger.debug("Validating order: {}", order.getId());
// Validation logic
}
private void processPayment(Order order) {
logger.debug("Processing payment for order: {}", order.getId());
// Payment processing logic
}
private void updateInventory(Order order) {
logger.debug("Updating inventory for order: {}", order.getId());
// Inventory update logic
}
public void shutdown() {
executorService.shutdown();
}
// Simple Order class
public static class Order {
private String id;
private String userId;
private double amount;
public Order(String id, String userId, double amount) {
this.id = id;
this.userId = userId;
this.amount = amount;
}
public String getId() { return id; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
}
}
4. Spring Boot Integration
Example 4: Spring Boot Configuration
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.UUID;
@Configuration
public class CorrelationIdConfig {
private static final Logger logger = LoggerFactory.getLogger(CorrelationIdConfig.class);
public static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
public static final String CORRELATION_ID_MDC_KEY = "correlationId";
@Bean
@Order(1)
public FilterRegistrationBean<CorrelationIdFilter> correlationIdFilter() {
FilterRegistrationBean<CorrelationIdFilter> registrationBean = new FilterRegistrationBean<>();
registrationBean.setFilter(new CorrelationIdFilter());
registrationBean.addUrlPatterns("/*");
registrationBean.setName("correlationIdFilter");
return registrationBean;
}
public static class CorrelationIdFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain)
throws ServletException, IOException {
// Get correlation ID from header or generate new one
String correlationId = getCorrelationIdFromHeader(request);
// Set up MDC context
MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
// Add correlation ID to response header
response.setHeader(CORRELATION_ID_HEADER, correlationId);
logger.info("Incoming request: {} {} [Correlation: {}]",
request.getMethod(), request.getRequestURI(), correlationId);
try {
filterChain.doFilter(request, response);
} finally {
// Log completion
logger.info("Request completed: {} {} [Status: {}, Correlation: {}]",
request.getMethod(), request.getRequestURI(),
response.getStatus(), correlationId);
// Clear MDC context
MDC.clear();
}
}
private String getCorrelationIdFromHeader(HttpServletRequest request) {
String correlationId = request.getHeader(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = generateCorrelationId();
logger.debug("Generated new correlation ID: {}", correlationId);
}
return correlationId;
}
private String generateCorrelationId() {
return "corr-" + UUID.randomUUID().toString().substring(0, 8);
}
}
}
// Spring Service using correlation ID
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@Service
public class OrderProcessingService {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingService.class);
@Autowired
private RestTemplate restTemplate;
@Autowired
private InventoryService inventoryService;
@Autowired
private NotificationService notificationService;
public Order processOrder(Order order) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Processing order: {} for user: {} [Correlation: {}]",
order.getId(), order.getUserId(), correlationId);
try {
// Step 1: Validate order
validateOrder(order);
logger.debug("Order validated: {}", order.getId());
// Step 2: Process payment
processPayment(order);
logger.debug("Payment processed for order: {}", order.getId());
// Step 3: Update inventory
inventoryService.updateInventory(order);
logger.debug("Inventory updated for order: {}", order.getId());
// Step 4: Send notification
notificationService.sendOrderConfirmation(order);
logger.debug("Notification sent for order: {}", order.getId());
logger.info("Order processed successfully: {} [Correlation: {}]",
order.getId(), correlationId);
return order;
} catch (Exception e) {
logger.error("Failed to process order: {} [Correlation: {}]",
order.getId(), correlationId, e);
throw e;
}
}
private void validateOrder(Order order) {
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("Invalid order amount");
}
}
private void processPayment(Order order) {
// Payment processing logic
logger.debug("Processing payment of ${} for order: {}",
order.getAmount(), order.getId());
// Simulate payment processing
}
}
// REST Controller with correlation ID
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private static final Logger logger = LoggerFactory.getLogger(OrderController.class);
@Autowired
private OrderProcessingService orderProcessingService;
@PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Creating order for user: {} [Correlation: {}]",
request.getUserId(), correlationId);
try {
Order order = new Order(
generateOrderId(),
request.getUserId(),
request.getAmount(),
request.getItems()
);
Order processedOrder = orderProcessingService.processOrder(order);
OrderResponse response = new OrderResponse(
processedOrder.getId(),
"SUCCESS",
"Order processed successfully"
);
logger.info("Order created successfully: {} [Correlation: {}]",
processedOrder.getId(), correlationId);
return ResponseEntity.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(response);
} catch (Exception e) {
logger.error("Failed to create order [Correlation: {}]", correlationId, e);
OrderResponse response = new OrderResponse(
null,
"ERROR",
"Failed to process order: " + e.getMessage()
);
return ResponseEntity.badRequest()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(response);
}
}
@GetMapping("/{orderId}")
public ResponseEntity<Order> getOrder(@PathVariable String orderId) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Fetching order: {} [Correlation: {}]", orderId, correlationId);
// Simulate order retrieval
Order order = new Order(orderId, "user-123", 99.99, List.of("item1", "item2"));
return ResponseEntity.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(order);
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
// Request/Response DTOs
public static class CreateOrderRequest {
private String userId;
private double amount;
private List<String> items;
// Constructors, getters, setters
public CreateOrderRequest() {}
public CreateOrderRequest(String userId, double amount, List<String> items) {
this.userId = userId;
this.amount = amount;
this.items = items;
}
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public List<String> getItems() { return items; }
public void setItems(List<String> items) { this.items = items; }
}
public static class OrderResponse {
private String orderId;
private String status;
private String message;
public OrderResponse(String orderId, String status, String message) {
this.orderId = orderId;
this.status = status;
this.message = message;
}
// Getters and setters
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}
}
5. Web Filter for HTTP Requests
Example 5: Advanced Web Filter with Feign Client Support
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
public class AdvancedCorrelationFilter extends OncePerRequestFilter {
private static final Logger logger = LoggerFactory.getLogger(AdvancedCorrelationFilter.class);
public static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
public static final String USER_ID_HEADER = "X-User-ID";
public static final String SESSION_ID_HEADER = "X-Session-ID";
private final List<String> excludedPaths = Arrays.asList("/health", "/metrics", "/info");
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain)
throws ServletException, IOException {
// Skip correlation for excluded paths
if (shouldSkipCorrelation(request)) {
filterChain.doFilter(request, response);
return;
}
// Extract or generate correlation context
CorrelationContext context = extractCorrelationContext(request);
// Set up MDC
setupMDC(context);
// Add correlation headers to response
addCorrelationHeaders(response, context);
long startTime = System.currentTimeMillis();
try {
logger.info("Request started: {} {} [User: {}, Correlation: {}]",
request.getMethod(), request.getRequestURI(),
context.getUserId(), context.getCorrelationId());
filterChain.doFilter(request, response);
} finally {
long duration = System.currentTimeMillis() - startTime;
logger.info("Request completed: {} {} [Status: {}, Duration: {}ms, Correlation: {}]",
request.getMethod(), request.getRequestURI(),
response.getStatus(), duration, context.getCorrelationId());
MDC.clear();
}
}
private boolean shouldSkipCorrelation(HttpServletRequest request) {
String path = request.getRequestURI();
return excludedPaths.stream().anyMatch(path::startsWith);
}
private CorrelationContext extractCorrelationContext(HttpServletRequest request) {
String correlationId = getHeaderOrDefault(request, CORRELATION_ID_HEADER,
() -> "corr-" + UUID.randomUUID().toString().substring(0, 12));
String userId = getHeaderOrDefault(request, USER_ID_HEADER, () -> "unknown");
String sessionId = getHeaderOrDefault(request, SESSION_ID_HEADER, () -> null);
Map<String, String> attributes = new HashMap<>();
attributes.put("clientIp", getClientIp(request));
attributes.put("userAgent", request.getHeader("User-Agent"));
attributes.put("requestPath", request.getRequestURI());
attributes.put("httpMethod", request.getMethod());
return new CorrelationContext(correlationId, userId, sessionId, attributes);
}
private String getHeaderOrDefault(HttpServletRequest request, String headerName,
Supplier<String> defaultValueSupplier) {
String value = request.getHeader(headerName);
return (value != null && !value.trim().isEmpty()) ? value : defaultValueSupplier.get();
}
private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
return request.getRemoteAddr();
}
private void setupMDC(CorrelationContext context) {
MDC.put("correlationId", context.getCorrelationId());
MDC.put("userId", context.getUserId());
if (context.getSessionId() != null) {
MDC.put("sessionId", context.getSessionId());
}
context.getAttributes().forEach(MDC::put);
}
private void addCorrelationHeaders(HttpServletResponse response, CorrelationContext context) {
response.setHeader(CORRELATION_ID_HEADER, context.getCorrelationId());
if (context.getUserId() != null) {
response.setHeader(USER_ID_HEADER, context.getUserId());
}
if (context.getSessionId() != null) {
response.setHeader(SESSION_ID_HEADER, context.getSessionId());
}
}
// Feign Client interceptor to propagate correlation headers
@Bean
public RequestInterceptor correlationIdRequestInterceptor() {
return template -> {
// Propagate correlation headers to downstream services
String correlationId = MDC.get("correlationId");
if (correlationId != null) {
template.header(CORRELATION_ID_HEADER, correlationId);
}
String userId = MDC.get("userId");
if (userId != null) {
template.header(USER_ID_HEADER, userId);
}
String sessionId = MDC.get("sessionId");
if (sessionId != null) {
template.header(SESSION_ID_HEADER, sessionId);
}
};
}
// Correlation context holder
public static class CorrelationContext {
private final String correlationId;
private final String userId;
private final String sessionId;
private final Map<String, String> attributes;
public CorrelationContext(String correlationId, String userId, String sessionId,
Map<String, String> attributes) {
this.correlationId = correlationId;
this.userId = userId;
this.sessionId = sessionId;
this.attributes = new HashMap<>(attributes);
}
public String getCorrelationId() { return correlationId; }
public String getUserId() { return userId; }
public String getSessionId() { return sessionId; }
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
}
}
6. Async and Reactive Support
Example 6: Reactive Correlation ID Support
import org.slf4j.MDC;
import reactor.core.publisher.Signal;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import java.util.function.Consumer;
public class ReactiveCorrelationContext {
public static <T> Mono<T> withCorrelationContext(Mono<T> mono) {
return Mono.deferContextual(contextView -> {
// Capture MDC context at subscription time
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return mono
.contextWrite(context -> context.put("mdcContext", mdcContext))
.doOnEach(handleOnEach(mdcContext));
});
}
private static <T> Consumer<Signal<T>> handleOnEach(Map<String, String> originalContext) {
return signal -> {
switch (signal.getType()) {
case ON_NEXT:
case ON_ERROR:
case ON_COMPLETE:
// Restore MDC context for terminal events
if (originalContext != null) {
MDC.setContextMap(originalContext);
} else {
MDC.clear();
}
break;
case ON_SUBSCRIBE:
// Set up MDC context at the beginning of the chain
Map<String, String> contextFromReactor = signal.getContextView().get("mdcContext");
if (contextFromReactor != null) {
MDC.setContextMap(contextFromReactor);
}
break;
}
};
}
// Wrapper for Runnable to use with Reactor schedulers
public static Runnable wrap(Runnable runnable) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
runnable.run();
} finally {
if (previous != null) {
MDC.setContextMap(previous);
} else {
MDC.clear();
}
}
};
}
// Custom scheduler that preserves MDC context
public static class MDCAwareScheduler {
public static reactor.core.scheduler.Scheduler create() {
return Schedulers.fromExecutorService(
MDCCorrelationManager.newMDCAwareExecutorService(
java.util.concurrent.Executors.newCachedThreadPool()
)
);
}
}
}
// Reactive service using correlation IDs
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
public class ReactiveOrderHandler {
private static final Logger logger = LoggerFactory.getLogger(ReactiveOrderHandler.class);
private final ReactiveOrderService orderService;
public ReactiveOrderHandler(ReactiveOrderService orderService) {
this.orderService = orderService;
}
public Mono<ServerResponse> createOrder(ServerRequest request) {
return ReactiveCorrelationContext.withCorrelationContext(
request.bodyToMono(CreateOrderRequest.class)
.flatMap(orderService::processOrder)
.doOnSubscribe(subscription ->
logger.info("Starting reactive order processing"))
.doOnSuccess(order ->
logger.info("Reactive order processing completed: {}", order.getId()))
.doOnError(error ->
logger.error("Reactive order processing failed", error))
.flatMap(order -> ServerResponse.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER,
MDC.get("correlationId"))
.bodyValue(order))
);
}
}
// Reactive service implementation
@Service
public class ReactiveOrderService {
private static final Logger logger = LoggerFactory.getLogger(ReactiveOrderService.class);
public Mono<Order> processOrder(CreateOrderRequest request) {
return Mono.fromCallable(() -> {
logger.info("Processing order for user: {}", request.getUserId());
return new Order(
generateOrderId(),
request.getUserId(),
request.getAmount()
);
})
.subscribeOn(Schedulers.boundedElastic()) // Use MDC-aware scheduler in production
.doOnNext(order -> logger.debug("Order created: {}", order.getId()));
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
}
7. Database and Messaging Integration
Example 7: Database and Message Correlation
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.support.TransactionTemplate;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
public class CorrelatedDataService {
private static final Logger logger = LoggerFactory.getLogger(CorrelatedDataService.class);
private final JdbcTemplate jdbcTemplate;
private final TransactionTemplate transactionTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
public CorrelatedDataService(JdbcTemplate jdbcTemplate,
TransactionTemplate transactionTemplate,
KafkaTemplate<String, Object> kafkaTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.transactionTemplate = transactionTemplate;
this.kafkaTemplate = kafkaTemplate;
}
public Order createOrderWithCorrelation(CreateOrderRequest request) {
String correlationId = MDC.get("correlationId");
String userId = MDC.get("userId");
logger.info("Creating order with correlation: {}", correlationId);
return transactionTemplate.execute(status -> {
try {
// Insert order with correlation ID
Order order = insertOrder(request, correlationId);
logger.debug("Order inserted into database: {}", order.getId());
// Publish order created event with correlation ID
publishOrderCreatedEvent(order, correlationId, userId);
logger.debug("Order created event published: {}", order.getId());
return order;
} catch (Exception e) {
logger.error("Failed to create order with correlation: {}", correlationId, e);
status.setRollbackOnly();
throw new RuntimeException("Order creation failed", e);
}
});
}
private Order insertOrder(CreateOrderRequest request, String correlationId) {
String sql = """
INSERT INTO orders (id, user_id, amount, status, correlation_id, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""";
String orderId = generateOrderId();
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
ps.setString(1, orderId);
ps.setString(2, request.getUserId());
ps.setDouble(3, request.getAmount());
ps.setString(4, "CREATED");
ps.setString(5, correlationId);
return ps;
}
}, keyHolder);
return new Order(orderId, request.getUserId(), request.getAmount(), "CREATED", correlationId);
}
private void publishOrderCreatedEvent(Order order, String correlationId, String userId) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getAmount(),
correlationId,
System.currentTimeMillis()
);
// Include correlation context in message headers
kafkaTemplate.send("order-created", order.getId(), event)
.addCallback(
result -> logger.debug("Order event sent successfully: {}", order.getId()),
failure -> logger.error("Failed to send order event: {}", order.getId(), failure)
);
}
public List<Order> findOrdersByCorrelationId(String correlationId) {
logger.info("Searching orders by correlation ID: {}", correlationId);
String sql = "SELECT * FROM orders WHERE correlation_id = ? ORDER BY created_at DESC";
return jdbcTemplate.query(sql, new Object[]{correlationId}, (rs, rowNum) ->
new Order(
rs.getString("id"),
rs.getString("user_id"),
rs.getDouble("amount"),
rs.getString("status"),
rs.getString("correlation_id")
)
);
}
// Kafka listener with correlation ID propagation
@KafkaListener(topics = "order-created")
public void handleOrderCreated(OrderCreatedEvent event,
@Header("correlationId") String correlationId) {
// Set up MDC context from message headers
MDC.put("correlationId", correlationId);
try {
logger.info("Processing order created event: {} [Correlation: {}]",
event.getOrderId(), correlationId);
// Process the event
processOrderEvent(event);
logger.info("Order event processed successfully: {} [Correlation: {}]",
event.getOrderId(), correlationId);
} finally {
MDC.clear();
}
}
private void processOrderEvent(OrderCreatedEvent event) {
// Event processing logic
logger.debug("Processing event for order: {}", event.getOrderId());
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
// Event classes
public static class OrderCreatedEvent {
private final String orderId;
private final String userId;
private final double amount;
private final String correlationId;
private final long timestamp;
public OrderCreatedEvent(String orderId, String userId, double amount,
String correlationId, long timestamp) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
this.correlationId = correlationId;
this.timestamp = timestamp;
}
// Getters
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
public String getCorrelationId() { return correlationId; }
public long getTimestamp() { return timestamp; }
}
public static class Order {
private final String id;
private final String userId;
private final double amount;
private final String status;
private final String correlationId;
public Order(String id, String userId, double amount, String status, String correlationId) {
this.id = id;
this.userId = userId;
this.amount = amount;
this.status = status;
this.correlationId = correlationId;
}
// Getters
public String getId() { return id; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
public String getStatus() { return status; }
public String getCorrelationId() { return correlationId; }
}
}
8. Distributed Tracing
Example 8: OpenTelemetry Integration
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.slf4j.MDC;
public class DistributedTracingService {
private final Tracer tracer;
public DistributedTracingService(Tracer tracer) {
this.tracer = tracer;
}
public void processOrderWithTracing(Order order) {
// Start a new span
Span span = tracer.spanBuilder("processOrder")
.setAttribute("order.id", order.getId())
.setAttribute("order.amount", order.getAmount())
.setAttribute("user.id", order.getUserId())
.startSpan();
try (Scope scope = span.makeCurrent()) {
// Set up correlation ID from trace ID
String traceId = span.getSpanContext().getTraceId();
MDC.put("correlationId", traceId);
MDC.put("traceId", traceId);
MDC.put("spanId", span.getSpanContext().getSpanId());
logger.info("Starting order processing with trace: {}", traceId);
// Process order steps
validateOrderWithSpan(order, span);
processPaymentWithSpan(order, span);
updateInventoryWithSpan(order, span);
logger.info("Order processing completed with trace: {}", traceId);
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
logger.error("Order processing failed with trace: {}",
span.getSpanContext().getTraceId(), e);
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
MDC.clear();
}
}
private void validateOrderWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("validateOrder")
.setParent(Context.current().with(parentSpan))
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Validating order: {}", order.getId());
// Validation logic
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("Invalid order amount");
}
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void processPaymentWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("processPayment")
.setParent(Context.current().with(parentSpan))
.setAttribute("payment.amount", order.getAmount())
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Processing payment for order: {}", order.getId());
// Payment processing logic
// Simulate processing
Thread.sleep(100);
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void updateInventoryWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("updateInventory")
.setParent(Context.current().with(parentSpan))
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Updating inventory for order: {}", order.getId());
// Inventory update logic
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
}
Logback Configuration Example
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%X{correlationId}] [%X{userId}] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<providers>
<timestamp/>
<logLevel/>
<threadName/>
<loggerName/>
<message/>
<mdc>
<key>correlationId</key>
<key>userId</key>
<key>sessionId</key>
<key>traceId</key>
<key>spanId</key>
</mdc>
<stackTrace/>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="JSON" />
</root>
</configuration>
Key Benefits and Best Practices
Benefits:
- Request Tracing: Track requests across service boundaries
- Debugging: Correlate logs for specific transactions
- Monitoring: Track performance and errors per request
- Auditing: Maintain audit trails for compliance
Best Practices:
- Generate Early: Create correlation IDs at entry points
- Propagate Everywhere: Pass correlation IDs across all service calls
- Include in Logs: Always log correlation IDs
- Clear Context: Always clear ThreadLocal and MDC after request completion
- Use Standards: Follow OpenTelemetry trace ID formats when possible
- Monitor Performance: Ensure correlation ID handling doesn't impact performance
This comprehensive implementation provides everything needed to implement correlation IDs in Java applications, from basic usage to advanced distributed tracing scenarios.