Correlation IDs in Logs: Complete Implementation Guide

Correlation IDs (also called trace IDs or request IDs) are essential for tracking requests across distributed systems, microservices, and asynchronous processing. They help correlate logs, metrics, and traces for a single business transaction.


Table of Contents

  1. Basic Correlation ID Implementation
  2. ThreadLocal-Based Context Management
  3. MDC (Mapped Diagnostic Context) Integration
  4. Spring Boot Integration
  5. Web Filter for HTTP Requests
  6. Async and Reactive Support
  7. Database and Messaging Integration
  8. Distributed Tracing

1. Basic Correlation ID Implementation

Example 1: Foundation Correlation ID Manager

import java.util.UUID;
import java.util.concurrent.Callable;
public class CorrelationId {
private static final ThreadLocal<String> currentCorrelationId = new ThreadLocal<>();
private CorrelationId() {
// Utility class
}
public static String generateCorrelationId() {
return "corr-" + UUID.randomUUID().toString().substring(0, 8);
}
public static String getCurrentCorrelationId() {
return currentCorrelationId.get();
}
public static void setCorrelationId(String correlationId) {
currentCorrelationId.set(correlationId);
}
public static void clear() {
currentCorrelationId.remove();
}
public static void withCorrelationId(String correlationId, Runnable task) {
String previousId = getCurrentCorrelationId();
try {
setCorrelationId(correlationId);
task.run();
} finally {
if (previousId != null) {
setCorrelationId(previousId);
} else {
clear();
}
}
}
public static <T> T withCorrelationId(String correlationId, Callable<T> task) throws Exception {
String previousId = getCurrentCorrelationId();
try {
setCorrelationId(correlationId);
return task.call();
} finally {
if (previousId != null) {
setCorrelationId(previousId);
} else {
clear();
}
}
}
// Auto-generating version
public static void withNewCorrelationId(Runnable task) {
withCorrelationId(generateCorrelationId(), task);
}
public static <T> T withNewCorrelationId(Callable<T> task) throws Exception {
return withCorrelationId(generateCorrelationId(), task);
}
}
// Usage examples
public class CorrelationIdDemo {
public static void main(String[] args) {
// Basic usage
CorrelationId.setCorrelationId("test-123");
System.out.println("Current correlation ID: " + CorrelationId.getCurrentCorrelationId());
// Scoped usage
CorrelationId.withCorrelationId("scoped-456", () -> {
System.out.println("Inside scope: " + CorrelationId.getCurrentCorrelationId());
performBusinessOperation();
});
// Auto-generated ID
CorrelationId.withNewCorrelationId(() -> {
System.out.println("Auto-generated: " + CorrelationId.getCurrentCorrelationId());
});
CorrelationId.clear();
}
private static void performBusinessOperation() {
System.out.println("Operation with correlation: " + CorrelationId.getCurrentCorrelationId());
}
}

2. ThreadLocal-Based Context Management

Example 2: Comprehensive Request Context

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class RequestContext {
private static final ThreadLocal<RequestContext> currentContext = new ThreadLocal<>();
private final String correlationId;
private final String userId;
private final String sessionId;
private final Map<String, String> customAttributes;
private final long startTime;
private RequestContext(Builder builder) {
this.correlationId = builder.correlationId;
this.userId = builder.userId;
this.sessionId = builder.sessionId;
this.customAttributes = new ConcurrentHashMap<>(builder.customAttributes);
this.startTime = System.currentTimeMillis();
}
public static class Builder {
private String correlationId;
private String userId;
private String sessionId;
private Map<String, String> customAttributes = new HashMap<>();
public Builder correlationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
public Builder userId(String userId) {
this.userId = userId;
return this;
}
public Builder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
public Builder attribute(String key, String value) {
this.customAttributes.put(key, value);
return this;
}
public RequestContext build() {
if (correlationId == null) {
correlationId = generateCorrelationId();
}
return new RequestContext(this);
}
}
// Getters
public String getCorrelationId() { return correlationId; }
public String getUserId() { return userId; }
public String getSessionId() { return sessionId; }
public Map<String, String> getCustomAttributes() { 
return Collections.unmodifiableMap(customAttributes); 
}
public long getStartTime() { return startTime; }
public long getDuration() { return System.currentTimeMillis() - startTime; }
public String getAttribute(String key) {
return customAttributes.get(key);
}
public void setAttribute(String key, String value) {
customAttributes.put(key, value);
}
// Static context management
public static void setCurrentContext(RequestContext context) {
currentContext.set(context);
}
public static RequestContext getCurrentContext() {
return currentContext.get();
}
public static String getCurrentCorrelationId() {
RequestContext context = getCurrentContext();
return context != null ? context.getCorrelationId() : null;
}
public static void clearContext() {
currentContext.remove();
}
public static <T> T withContext(RequestContext context, Callable<T> task) throws Exception {
RequestContext previous = getCurrentContext();
try {
setCurrentContext(context);
return task.call();
} finally {
if (previous != null) {
setCurrentContext(previous);
} else {
clearContext();
}
}
}
public static void withContext(RequestContext context, Runnable task) {
RequestContext previous = getCurrentContext();
try {
setCurrentContext(context);
task.run();
} finally {
if (previous != null) {
setCurrentContext(previous);
} else {
clearContext();
}
}
}
private static String generateCorrelationId() {
return "req-" + UUID.randomUUID().toString().substring(0, 12);
}
@Override
public String toString() {
return String.format(
"RequestContext{correlationId='%s', userId='%s', sessionId='%s', attributes=%s}",
correlationId, userId, sessionId, customAttributes);
}
}
// Usage example
public class RequestContextDemo {
public static void main(String[] args) throws Exception {
// Create a request context
RequestContext context = new RequestContext.Builder()
.correlationId("corr-123456")
.userId("user-789")
.sessionId("session-abc")
.attribute("client-ip", "192.168.1.100")
.attribute("user-agent", "Mozilla/5.0")
.build();
// Execute with context
RequestContext.withContext(context, () -> {
processOrder("order-001", 99.99);
sendNotification("user-789", "Order processed");
});
// Verify context is cleared
System.out.println("After execution: " + RequestContext.getCurrentContext());
}
private static void processOrder(String orderId, double amount) {
RequestContext context = RequestContext.getCurrentContext();
System.out.printf("Processing order %s for user %s with correlation %s%n",
orderId, context.getUserId(), context.getCorrelationId());
// Log with correlation ID
logInfo("Order processing started", Map.of("orderId", orderId, "amount", String.valueOf(amount)));
}
private static void sendNotification(String userId, String message) {
RequestContext context = RequestContext.getCurrentContext();
logInfo("Sending notification", 
Map.of("userId", userId, "message", message, "correlationId", context.getCorrelationId()));
}
private static void logInfo(String message, Map<String, String> context) {
String correlationId = RequestContext.getCurrentCorrelationId();
System.out.printf("[%s] %s - %s%n", correlationId, message, context);
}
}

3. MDC (Mapped Diagnostic Context) Integration

Example 3: SLF4J MDC Integration

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
public class MDCCorrelationManager {
private static final Logger logger = LoggerFactory.getLogger(MDCCorrelationManager.class);
// MDC Keys
public static final String CORRELATION_ID = "correlationId";
public static final String USER_ID = "userId";
public static final String SESSION_ID = "sessionId";
public static final String REQUEST_PATH = "requestPath";
public static final String CLIENT_IP = "clientIp";
public static void setupContext(String correlationId, String userId, String sessionId) {
MDC.put(CORRELATION_ID, correlationId);
if (userId != null) MDC.put(USER_ID, userId);
if (sessionId != null) MDC.put(SESSION_ID, sessionId);
}
public static void setupContext(Map<String, String> context) {
context.forEach(MDC::put);
}
public static void setCorrelationId(String correlationId) {
MDC.put(CORRELATION_ID, correlationId);
}
public static String getCorrelationId() {
return MDC.get(CORRELATION_ID);
}
public static void clear() {
MDC.clear();
}
public static Map<String, String> getCopyOfContextMap() {
return MDC.getCopyOfContextMap();
}
public static void setContextMap(Map<String, String> context) {
MDC.setContextMap(context);
}
// Wrapper for Runnable that preserves MDC context
public static Runnable wrap(Runnable task) {
Map<String, String> context = getCopyOfContextMap();
return () -> {
Map<String, String> previous = getCopyOfContextMap();
try {
setContextMap(context);
task.run();
} finally {
setContextMap(previous);
}
};
}
// Wrapper for Callable that preserves MDC context
public static <T> Callable<T> wrap(Callable<T> task) {
Map<String, String> context = getCopyOfContextMap();
return () -> {
Map<String, String> previous = getCopyOfContextMap();
try {
setContextMap(context);
return task.call();
} finally {
setContextMap(previous);
}
};
}
// MDC-aware executor service
public static ExecutorService newMDCAwareExecutorService(ExecutorService delegate) {
return new MDCAwareExecutorService(delegate);
}
public static ScheduledExecutorService newMDCAwareScheduledExecutorService(int corePoolSize) {
return new MDCAwareScheduledThreadPoolExecutor(corePoolSize);
}
// MDC-aware thread pool executor
private static class MDCAwareExecutorService implements ExecutorService {
private final ExecutorService delegate;
public MDCAwareExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}
// Delegate all other methods
@Override public void shutdown() { delegate.shutdown(); }
@Override public List<Runnable> shutdownNow() { return delegate.shutdownNow(); }
@Override public boolean isShutdown() { return delegate.isShutdown(); }
@Override public boolean isTerminated() { return delegate.isTerminated(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit) 
throws InterruptedException { return delegate.awaitTermination(timeout, unit); }
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 
long timeout, TimeUnit unit) 
throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapCollection(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 
long timeout, TimeUnit unit) 
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
}
private <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
Collection<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(wrap(task));
}
return wrapped;
}
}
// MDC-aware scheduled executor
private static class MDCAwareScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public MDCAwareScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
@Override
public void execute(Runnable command) {
super.execute(MDCCorrelationManager.wrap(command));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(MDCCorrelationManager.wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(MDCCorrelationManager.wrap(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(MDCCorrelationManager.wrap(task));
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return super.schedule(MDCCorrelationManager.wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return super.schedule(MDCCorrelationManager.wrap(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 
long period, TimeUnit unit) {
return super.scheduleAtFixedRate(MDCCorrelationManager.wrap(command), 
initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 
long delay, TimeUnit unit) {
return super.scheduleWithFixedDelay(MDCCorrelationManager.wrap(command), 
initialDelay, delay, unit);
}
}
}
// Service using MDC correlation
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
private final ExecutorService executorService;
public OrderService() {
this.executorService = MDCCorrelationManager.newMDCAwareExecutorService(
Executors.newFixedThreadPool(4)
);
}
public void processOrderAsync(Order order) {
logger.info("Processing order asynchronously: {}", order.getId());
executorService.submit(() -> {
// This will have the MDC context from the calling thread
try {
processOrder(order);
logger.info("Order processed successfully: {}", order.getId());
} catch (Exception e) {
logger.error("Failed to process order: {}", order.getId(), e);
}
});
}
private void processOrder(Order order) {
String correlationId = MDCCorrelationManager.getCorrelationId();
logger.info("Starting order processing with correlation: {}", correlationId);
// Simulate processing steps
validateOrder(order);
processPayment(order);
updateInventory(order);
logger.info("Order processing completed: {}", order.getId());
}
private void validateOrder(Order order) {
logger.debug("Validating order: {}", order.getId());
// Validation logic
}
private void processPayment(Order order) {
logger.debug("Processing payment for order: {}", order.getId());
// Payment processing logic
}
private void updateInventory(Order order) {
logger.debug("Updating inventory for order: {}", order.getId());
// Inventory update logic
}
public void shutdown() {
executorService.shutdown();
}
// Simple Order class
public static class Order {
private String id;
private String userId;
private double amount;
public Order(String id, String userId, double amount) {
this.id = id;
this.userId = userId;
this.amount = amount;
}
public String getId() { return id; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
}
}

4. Spring Boot Integration

Example 4: Spring Boot Configuration

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.UUID;
@Configuration
public class CorrelationIdConfig {
private static final Logger logger = LoggerFactory.getLogger(CorrelationIdConfig.class);
public static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
public static final String CORRELATION_ID_MDC_KEY = "correlationId";
@Bean
@Order(1)
public FilterRegistrationBean<CorrelationIdFilter> correlationIdFilter() {
FilterRegistrationBean<CorrelationIdFilter> registrationBean = new FilterRegistrationBean<>();
registrationBean.setFilter(new CorrelationIdFilter());
registrationBean.addUrlPatterns("/*");
registrationBean.setName("correlationIdFilter");
return registrationBean;
}
public static class CorrelationIdFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, 
HttpServletResponse response, 
FilterChain filterChain) 
throws ServletException, IOException {
// Get correlation ID from header or generate new one
String correlationId = getCorrelationIdFromHeader(request);
// Set up MDC context
MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
// Add correlation ID to response header
response.setHeader(CORRELATION_ID_HEADER, correlationId);
logger.info("Incoming request: {} {} [Correlation: {}]", 
request.getMethod(), request.getRequestURI(), correlationId);
try {
filterChain.doFilter(request, response);
} finally {
// Log completion
logger.info("Request completed: {} {} [Status: {}, Correlation: {}]", 
request.getMethod(), request.getRequestURI(), 
response.getStatus(), correlationId);
// Clear MDC context
MDC.clear();
}
}
private String getCorrelationIdFromHeader(HttpServletRequest request) {
String correlationId = request.getHeader(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = generateCorrelationId();
logger.debug("Generated new correlation ID: {}", correlationId);
}
return correlationId;
}
private String generateCorrelationId() {
return "corr-" + UUID.randomUUID().toString().substring(0, 8);
}
}
}
// Spring Service using correlation ID
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@Service
public class OrderProcessingService {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingService.class);
@Autowired
private RestTemplate restTemplate;
@Autowired
private InventoryService inventoryService;
@Autowired
private NotificationService notificationService;
public Order processOrder(Order order) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Processing order: {} for user: {} [Correlation: {}]", 
order.getId(), order.getUserId(), correlationId);
try {
// Step 1: Validate order
validateOrder(order);
logger.debug("Order validated: {}", order.getId());
// Step 2: Process payment
processPayment(order);
logger.debug("Payment processed for order: {}", order.getId());
// Step 3: Update inventory
inventoryService.updateInventory(order);
logger.debug("Inventory updated for order: {}", order.getId());
// Step 4: Send notification
notificationService.sendOrderConfirmation(order);
logger.debug("Notification sent for order: {}", order.getId());
logger.info("Order processed successfully: {} [Correlation: {}]", 
order.getId(), correlationId);
return order;
} catch (Exception e) {
logger.error("Failed to process order: {} [Correlation: {}]", 
order.getId(), correlationId, e);
throw e;
}
}
private void validateOrder(Order order) {
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("Invalid order amount");
}
}
private void processPayment(Order order) {
// Payment processing logic
logger.debug("Processing payment of ${} for order: {}", 
order.getAmount(), order.getId());
// Simulate payment processing
}
}
// REST Controller with correlation ID
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private static final Logger logger = LoggerFactory.getLogger(OrderController.class);
@Autowired
private OrderProcessingService orderProcessingService;
@PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Creating order for user: {} [Correlation: {}]", 
request.getUserId(), correlationId);
try {
Order order = new Order(
generateOrderId(),
request.getUserId(),
request.getAmount(),
request.getItems()
);
Order processedOrder = orderProcessingService.processOrder(order);
OrderResponse response = new OrderResponse(
processedOrder.getId(),
"SUCCESS",
"Order processed successfully"
);
logger.info("Order created successfully: {} [Correlation: {}]", 
processedOrder.getId(), correlationId);
return ResponseEntity.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(response);
} catch (Exception e) {
logger.error("Failed to create order [Correlation: {}]", correlationId, e);
OrderResponse response = new OrderResponse(
null,
"ERROR",
"Failed to process order: " + e.getMessage()
);
return ResponseEntity.badRequest()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(response);
}
}
@GetMapping("/{orderId}")
public ResponseEntity<Order> getOrder(@PathVariable String orderId) {
String correlationId = MDC.get(CorrelationIdConfig.CORRELATION_ID_MDC_KEY);
logger.info("Fetching order: {} [Correlation: {}]", orderId, correlationId);
// Simulate order retrieval
Order order = new Order(orderId, "user-123", 99.99, List.of("item1", "item2"));
return ResponseEntity.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, correlationId)
.body(order);
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
// Request/Response DTOs
public static class CreateOrderRequest {
private String userId;
private double amount;
private List<String> items;
// Constructors, getters, setters
public CreateOrderRequest() {}
public CreateOrderRequest(String userId, double amount, List<String> items) {
this.userId = userId;
this.amount = amount;
this.items = items;
}
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public List<String> getItems() { return items; }
public void setItems(List<String> items) { this.items = items; }
}
public static class OrderResponse {
private String orderId;
private String status;
private String message;
public OrderResponse(String orderId, String status, String message) {
this.orderId = orderId;
this.status = status;
this.message = message;
}
// Getters and setters
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}
}

5. Web Filter for HTTP Requests

Example 5: Advanced Web Filter with Feign Client Support

import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
public class AdvancedCorrelationFilter extends OncePerRequestFilter {
private static final Logger logger = LoggerFactory.getLogger(AdvancedCorrelationFilter.class);
public static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
public static final String USER_ID_HEADER = "X-User-ID";
public static final String SESSION_ID_HEADER = "X-Session-ID";
private final List<String> excludedPaths = Arrays.asList("/health", "/metrics", "/info");
@Override
protected void doFilterInternal(HttpServletRequest request, 
HttpServletResponse response, 
FilterChain filterChain) 
throws ServletException, IOException {
// Skip correlation for excluded paths
if (shouldSkipCorrelation(request)) {
filterChain.doFilter(request, response);
return;
}
// Extract or generate correlation context
CorrelationContext context = extractCorrelationContext(request);
// Set up MDC
setupMDC(context);
// Add correlation headers to response
addCorrelationHeaders(response, context);
long startTime = System.currentTimeMillis();
try {
logger.info("Request started: {} {} [User: {}, Correlation: {}]", 
request.getMethod(), request.getRequestURI(), 
context.getUserId(), context.getCorrelationId());
filterChain.doFilter(request, response);
} finally {
long duration = System.currentTimeMillis() - startTime;
logger.info("Request completed: {} {} [Status: {}, Duration: {}ms, Correlation: {}]", 
request.getMethod(), request.getRequestURI(), 
response.getStatus(), duration, context.getCorrelationId());
MDC.clear();
}
}
private boolean shouldSkipCorrelation(HttpServletRequest request) {
String path = request.getRequestURI();
return excludedPaths.stream().anyMatch(path::startsWith);
}
private CorrelationContext extractCorrelationContext(HttpServletRequest request) {
String correlationId = getHeaderOrDefault(request, CORRELATION_ID_HEADER, 
() -> "corr-" + UUID.randomUUID().toString().substring(0, 12));
String userId = getHeaderOrDefault(request, USER_ID_HEADER, () -> "unknown");
String sessionId = getHeaderOrDefault(request, SESSION_ID_HEADER, () -> null);
Map<String, String> attributes = new HashMap<>();
attributes.put("clientIp", getClientIp(request));
attributes.put("userAgent", request.getHeader("User-Agent"));
attributes.put("requestPath", request.getRequestURI());
attributes.put("httpMethod", request.getMethod());
return new CorrelationContext(correlationId, userId, sessionId, attributes);
}
private String getHeaderOrDefault(HttpServletRequest request, String headerName, 
Supplier<String> defaultValueSupplier) {
String value = request.getHeader(headerName);
return (value != null && !value.trim().isEmpty()) ? value : defaultValueSupplier.get();
}
private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
return request.getRemoteAddr();
}
private void setupMDC(CorrelationContext context) {
MDC.put("correlationId", context.getCorrelationId());
MDC.put("userId", context.getUserId());
if (context.getSessionId() != null) {
MDC.put("sessionId", context.getSessionId());
}
context.getAttributes().forEach(MDC::put);
}
private void addCorrelationHeaders(HttpServletResponse response, CorrelationContext context) {
response.setHeader(CORRELATION_ID_HEADER, context.getCorrelationId());
if (context.getUserId() != null) {
response.setHeader(USER_ID_HEADER, context.getUserId());
}
if (context.getSessionId() != null) {
response.setHeader(SESSION_ID_HEADER, context.getSessionId());
}
}
// Feign Client interceptor to propagate correlation headers
@Bean
public RequestInterceptor correlationIdRequestInterceptor() {
return template -> {
// Propagate correlation headers to downstream services
String correlationId = MDC.get("correlationId");
if (correlationId != null) {
template.header(CORRELATION_ID_HEADER, correlationId);
}
String userId = MDC.get("userId");
if (userId != null) {
template.header(USER_ID_HEADER, userId);
}
String sessionId = MDC.get("sessionId");
if (sessionId != null) {
template.header(SESSION_ID_HEADER, sessionId);
}
};
}
// Correlation context holder
public static class CorrelationContext {
private final String correlationId;
private final String userId;
private final String sessionId;
private final Map<String, String> attributes;
public CorrelationContext(String correlationId, String userId, String sessionId, 
Map<String, String> attributes) {
this.correlationId = correlationId;
this.userId = userId;
this.sessionId = sessionId;
this.attributes = new HashMap<>(attributes);
}
public String getCorrelationId() { return correlationId; }
public String getUserId() { return userId; }
public String getSessionId() { return sessionId; }
public Map<String, String> getAttributes() { 
return Collections.unmodifiableMap(attributes); 
}
}
}

6. Async and Reactive Support

Example 6: Reactive Correlation ID Support

import org.slf4j.MDC;
import reactor.core.publisher.Signal;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import java.util.function.Consumer;
public class ReactiveCorrelationContext {
public static <T> Mono<T> withCorrelationContext(Mono<T> mono) {
return Mono.deferContextual(contextView -> {
// Capture MDC context at subscription time
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return mono
.contextWrite(context -> context.put("mdcContext", mdcContext))
.doOnEach(handleOnEach(mdcContext));
});
}
private static <T> Consumer<Signal<T>> handleOnEach(Map<String, String> originalContext) {
return signal -> {
switch (signal.getType()) {
case ON_NEXT:
case ON_ERROR:
case ON_COMPLETE:
// Restore MDC context for terminal events
if (originalContext != null) {
MDC.setContextMap(originalContext);
} else {
MDC.clear();
}
break;
case ON_SUBSCRIBE:
// Set up MDC context at the beginning of the chain
Map<String, String> contextFromReactor = signal.getContextView().get("mdcContext");
if (contextFromReactor != null) {
MDC.setContextMap(contextFromReactor);
}
break;
}
};
}
// Wrapper for Runnable to use with Reactor schedulers
public static Runnable wrap(Runnable runnable) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
runnable.run();
} finally {
if (previous != null) {
MDC.setContextMap(previous);
} else {
MDC.clear();
}
}
};
}
// Custom scheduler that preserves MDC context
public static class MDCAwareScheduler {
public static reactor.core.scheduler.Scheduler create() {
return Schedulers.fromExecutorService(
MDCCorrelationManager.newMDCAwareExecutorService(
java.util.concurrent.Executors.newCachedThreadPool()
)
);
}
}
}
// Reactive service using correlation IDs
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
public class ReactiveOrderHandler {
private static final Logger logger = LoggerFactory.getLogger(ReactiveOrderHandler.class);
private final ReactiveOrderService orderService;
public ReactiveOrderHandler(ReactiveOrderService orderService) {
this.orderService = orderService;
}
public Mono<ServerResponse> createOrder(ServerRequest request) {
return ReactiveCorrelationContext.withCorrelationContext(
request.bodyToMono(CreateOrderRequest.class)
.flatMap(orderService::processOrder)
.doOnSubscribe(subscription -> 
logger.info("Starting reactive order processing"))
.doOnSuccess(order -> 
logger.info("Reactive order processing completed: {}", order.getId()))
.doOnError(error -> 
logger.error("Reactive order processing failed", error))
.flatMap(order -> ServerResponse.ok()
.header(CorrelationIdConfig.CORRELATION_ID_HEADER, 
MDC.get("correlationId"))
.bodyValue(order))
);
}
}
// Reactive service implementation
@Service
public class ReactiveOrderService {
private static final Logger logger = LoggerFactory.getLogger(ReactiveOrderService.class);
public Mono<Order> processOrder(CreateOrderRequest request) {
return Mono.fromCallable(() -> {
logger.info("Processing order for user: {}", request.getUserId());
return new Order(
generateOrderId(),
request.getUserId(),
request.getAmount()
);
})
.subscribeOn(Schedulers.boundedElastic()) // Use MDC-aware scheduler in production
.doOnNext(order -> logger.debug("Order created: {}", order.getId()));
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
}

7. Database and Messaging Integration

Example 7: Database and Message Correlation

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.support.TransactionTemplate;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
public class CorrelatedDataService {
private static final Logger logger = LoggerFactory.getLogger(CorrelatedDataService.class);
private final JdbcTemplate jdbcTemplate;
private final TransactionTemplate transactionTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
public CorrelatedDataService(JdbcTemplate jdbcTemplate,
TransactionTemplate transactionTemplate,
KafkaTemplate<String, Object> kafkaTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.transactionTemplate = transactionTemplate;
this.kafkaTemplate = kafkaTemplate;
}
public Order createOrderWithCorrelation(CreateOrderRequest request) {
String correlationId = MDC.get("correlationId");
String userId = MDC.get("userId");
logger.info("Creating order with correlation: {}", correlationId);
return transactionTemplate.execute(status -> {
try {
// Insert order with correlation ID
Order order = insertOrder(request, correlationId);
logger.debug("Order inserted into database: {}", order.getId());
// Publish order created event with correlation ID
publishOrderCreatedEvent(order, correlationId, userId);
logger.debug("Order created event published: {}", order.getId());
return order;
} catch (Exception e) {
logger.error("Failed to create order with correlation: {}", correlationId, e);
status.setRollbackOnly();
throw new RuntimeException("Order creation failed", e);
}
});
}
private Order insertOrder(CreateOrderRequest request, String correlationId) {
String sql = """
INSERT INTO orders (id, user_id, amount, status, correlation_id, created_at) 
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""";
String orderId = generateOrderId();
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
ps.setString(1, orderId);
ps.setString(2, request.getUserId());
ps.setDouble(3, request.getAmount());
ps.setString(4, "CREATED");
ps.setString(5, correlationId);
return ps;
}
}, keyHolder);
return new Order(orderId, request.getUserId(), request.getAmount(), "CREATED", correlationId);
}
private void publishOrderCreatedEvent(Order order, String correlationId, String userId) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getAmount(),
correlationId,
System.currentTimeMillis()
);
// Include correlation context in message headers
kafkaTemplate.send("order-created", order.getId(), event)
.addCallback(
result -> logger.debug("Order event sent successfully: {}", order.getId()),
failure -> logger.error("Failed to send order event: {}", order.getId(), failure)
);
}
public List<Order> findOrdersByCorrelationId(String correlationId) {
logger.info("Searching orders by correlation ID: {}", correlationId);
String sql = "SELECT * FROM orders WHERE correlation_id = ? ORDER BY created_at DESC";
return jdbcTemplate.query(sql, new Object[]{correlationId}, (rs, rowNum) -> 
new Order(
rs.getString("id"),
rs.getString("user_id"),
rs.getDouble("amount"),
rs.getString("status"),
rs.getString("correlation_id")
)
);
}
// Kafka listener with correlation ID propagation
@KafkaListener(topics = "order-created")
public void handleOrderCreated(OrderCreatedEvent event, 
@Header("correlationId") String correlationId) {
// Set up MDC context from message headers
MDC.put("correlationId", correlationId);
try {
logger.info("Processing order created event: {} [Correlation: {}]", 
event.getOrderId(), correlationId);
// Process the event
processOrderEvent(event);
logger.info("Order event processed successfully: {} [Correlation: {}]", 
event.getOrderId(), correlationId);
} finally {
MDC.clear();
}
}
private void processOrderEvent(OrderCreatedEvent event) {
// Event processing logic
logger.debug("Processing event for order: {}", event.getOrderId());
}
private String generateOrderId() {
return "order-" + UUID.randomUUID().toString().substring(0, 8);
}
// Event classes
public static class OrderCreatedEvent {
private final String orderId;
private final String userId;
private final double amount;
private final String correlationId;
private final long timestamp;
public OrderCreatedEvent(String orderId, String userId, double amount, 
String correlationId, long timestamp) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
this.correlationId = correlationId;
this.timestamp = timestamp;
}
// Getters
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
public String getCorrelationId() { return correlationId; }
public long getTimestamp() { return timestamp; }
}
public static class Order {
private final String id;
private final String userId;
private final double amount;
private final String status;
private final String correlationId;
public Order(String id, String userId, double amount, String status, String correlationId) {
this.id = id;
this.userId = userId;
this.amount = amount;
this.status = status;
this.correlationId = correlationId;
}
// Getters
public String getId() { return id; }
public String getUserId() { return userId; }
public double getAmount() { return amount; }
public String getStatus() { return status; }
public String getCorrelationId() { return correlationId; }
}
}

8. Distributed Tracing

Example 8: OpenTelemetry Integration

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.slf4j.MDC;
public class DistributedTracingService {
private final Tracer tracer;
public DistributedTracingService(Tracer tracer) {
this.tracer = tracer;
}
public void processOrderWithTracing(Order order) {
// Start a new span
Span span = tracer.spanBuilder("processOrder")
.setAttribute("order.id", order.getId())
.setAttribute("order.amount", order.getAmount())
.setAttribute("user.id", order.getUserId())
.startSpan();
try (Scope scope = span.makeCurrent()) {
// Set up correlation ID from trace ID
String traceId = span.getSpanContext().getTraceId();
MDC.put("correlationId", traceId);
MDC.put("traceId", traceId);
MDC.put("spanId", span.getSpanContext().getSpanId());
logger.info("Starting order processing with trace: {}", traceId);
// Process order steps
validateOrderWithSpan(order, span);
processPaymentWithSpan(order, span);
updateInventoryWithSpan(order, span);
logger.info("Order processing completed with trace: {}", traceId);
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
logger.error("Order processing failed with trace: {}", 
span.getSpanContext().getTraceId(), e);
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
MDC.clear();
}
}
private void validateOrderWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("validateOrder")
.setParent(Context.current().with(parentSpan))
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Validating order: {}", order.getId());
// Validation logic
if (order.getAmount() <= 0) {
throw new IllegalArgumentException("Invalid order amount");
}
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void processPaymentWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("processPayment")
.setParent(Context.current().with(parentSpan))
.setAttribute("payment.amount", order.getAmount())
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Processing payment for order: {}", order.getId());
// Payment processing logic
// Simulate processing
Thread.sleep(100);
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void updateInventoryWithSpan(Order order, Span parentSpan) {
Span span = tracer.spanBuilder("updateInventory")
.setParent(Context.current().with(parentSpan))
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("Updating inventory for order: {}", order.getId());
// Inventory update logic
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
} catch (Exception e) {
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR);
span.recordException(e);
throw e;
} finally {
span.end();
}
}
}

Logback Configuration Example

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%X{correlationId}] [%X{userId}] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<providers>
<timestamp/>
<logLevel/>
<threadName/>
<loggerName/>
<message/>
<mdc>
<key>correlationId</key>
<key>userId</key>
<key>sessionId</key>
<key>traceId</key>
<key>spanId</key>
</mdc>
<stackTrace/>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="JSON" />
</root>
</configuration>

Key Benefits and Best Practices

Benefits:

  1. Request Tracing: Track requests across service boundaries
  2. Debugging: Correlate logs for specific transactions
  3. Monitoring: Track performance and errors per request
  4. Auditing: Maintain audit trails for compliance

Best Practices:

  1. Generate Early: Create correlation IDs at entry points
  2. Propagate Everywhere: Pass correlation IDs across all service calls
  3. Include in Logs: Always log correlation IDs
  4. Clear Context: Always clear ThreadLocal and MDC after request completion
  5. Use Standards: Follow OpenTelemetry trace ID formats when possible
  6. Monitor Performance: Ensure correlation ID handling doesn't impact performance

This comprehensive implementation provides everything needed to implement correlation IDs in Java applications, from basic usage to advanced distributed tracing scenarios.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper