Elastic APM Integration and Correlation in Java

Introduction

Elastic APM (Application Performance Monitoring) provides distributed tracing, performance monitoring, and error tracking for applications. Correlation in Elastic APM enables you to connect related events across different services and systems, providing end-to-end visibility into your application's performance and behavior.

This comprehensive guide covers Elastic APM integration, trace correlation, custom spans, and advanced monitoring patterns in Java applications.


Core Concepts

1. Elastic APM Components

  • APM Agent: Auto-instrumentation for Java applications
  • APM Server: Collects and processes monitoring data
  • Elasticsearch: Stores APM data
  • Kibana: Visualizes APM data

2. Correlation Concepts

  • Distributed Tracing: Track requests across service boundaries
  • Span Correlation: Connect related operations within a trace
  • Service Maps: Visualize service dependencies
  • Error Correlation: Link errors to specific traces and transactions

Project Setup and Dependencies

1. Maven Dependencies

<dependencies>
<!-- Elastic APM Agent (usually attached at runtime) -->
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Database -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- HTTP Client -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Elastic APM API -->
<dependency>
<groupId>co.elastic.apm</groupId>
<artifactId>apm-agent-api</artifactId>
<version>1.45.0</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2. Application Configuration

# application.yml
elastic:
apm:
server-url: http://localhost:8200
service-name: order-service
environment: development
application-packages: com.example.elasticapm
spring:
datasource:
url: jdbc:postgresql://localhost:5432/orders
username: postgres
password: password
jpa:
hibernate:
ddl-auto: update
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
logging:
level:
co.elastic.apm: DEBUG
com.example.elasticapm: INFO

Elastic APM Configuration

1. APM Agent Configuration

@Configuration
@Slf4j
public class ElasticApmConfig {
@Bean
@ConditionalOnMissingBean
public ApmAgentConfiguration apmAgentConfiguration(
@Value("${elastic.apm.server-url}") String serverUrl,
@Value("${elastic.apm.service-name}") String serviceName,
@Value("${elastic.apm.environment}") String environment,
@Value("${elastic.apm.application-packages}") String applicationPackages) {
return ApmAgentConfiguration.builder()
.serverUrl(serverUrl)
.serviceName(serviceName)
.environment(environment)
.applicationPackages(applicationPackages)
.build();
}
@Bean
public ElasticApmTracer elasticApmTracer(ApmAgentConfiguration config) {
// In a real scenario, this would be handled by the APM agent
// This bean demonstrates how you might configure custom tracing
log.info("Elastic APM configured for service: {} in environment: {}", 
config.getServiceName(), config.getEnvironment());
return new DefaultElasticApmTracer();
}
}
@Data
@Builder
public class ApmAgentConfiguration {
private String serverUrl;
private String serviceName;
private String environment;
private String applicationPackages;
private double transactionSampleRate = 1.0;
private int transactionMaxSpans = 500;
private long metricsInterval = 30L;
}

2. Custom Tracer Service

@Service
@Slf4j
public class ElasticApmTracerService {
private final co.elastic.apm.api.ElasticApm apm;
public ElasticApmTracerService() {
this.apm = co.elastic.apm.api.ElasticApm.getInstance();
}
/**
* Get current transaction
*/
public co.elastic.apm.api.Transaction getCurrentTransaction() {
return apm.currentTransaction();
}
/**
* Start a new custom transaction
*/
public co.elastic.apm.api.Transaction startTransaction(String name, String type, String subtype) {
co.elastic.apm.api.Transaction transaction = apm.startTransaction();
if (transaction != null) {
transaction.setName(name);
transaction.setType(type);
if (subtype != null) {
transaction.setSubtype(subtype);
}
}
return transaction;
}
/**
* Start a new span within current transaction
*/
public co.elastic.apm.api.Span startSpan(String name, String type, String subtype) {
co.elastic.apm.api.Span span = apm.currentSpan().startSpan();
if (span != null) {
span.setName(name);
span.setType(type);
if (subtype != null) {
span.setSubtype(subtype);
}
}
return span;
}
/**
* Add custom labels to current transaction
*/
public void addLabel(String key, String value) {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
if (transaction != null) {
transaction.addLabel(key, value);
}
}
/**
* Add custom labels to current span
*/
public void addSpanLabel(String key, String value) {
co.elastic.apm.api.Span span = apm.currentSpan();
if (span != null) {
span.addLabel(key, value);
}
}
/**
* Capture exception in current transaction
*/
public void captureException(Throwable throwable) {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
if (transaction != null) {
transaction.captureException(throwable);
}
}
/**
* Set transaction outcome
*/
public void setTransactionOutcome(boolean success) {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
if (transaction != null) {
transaction.setOutcome(success ? "success" : "failure");
}
}
/**
* Set span outcome
*/
public void setSpanOutcome(boolean success) {
co.elastic.apm.api.Span span = apm.currentSpan();
if (span != null) {
span.setOutcome(success ? "success" : "failure");
}
}
/**
* Get current trace ID for correlation
*/
public String getCurrentTraceId() {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
return transaction != null ? transaction.getTraceId() : null;
}
/**
* Get current transaction ID for correlation
*/
public String getCurrentTransactionId() {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
return transaction != null ? transaction.getId() : null;
}
/**
* Inject trace context into headers for propagation
*/
public void injectTraceContext(Map<String, String> headers) {
co.elastic.apm.api.Transaction transaction = getCurrentTransaction();
if (transaction != null) {
transaction.injectTraceHeaders((carrier, key, value) -> {
if (carrier instanceof Map) {
((Map<String, String>) carrier).put(key, value);
}
});
}
}
/**
* Extract trace context from headers
*/
public co.elastic.apm.api.Transaction extractTraceContext(Map<String, String> headers) {
return apm.startTransactionWithRemoteParent(remoteParent -> {
headers.forEach(remoteParent::addTextHeader);
});
}
}

HTTP Correlation

1. HTTP Client with APM Correlation

@Component
@Slf4j
public class ApmCorrelatedHttpClient {
private final WebClient webClient;
private final ElasticApmTracerService tracerService;
private final ObjectMapper objectMapper;
public ApmCorrelatedHttpClient(WebClient.Builder webClientBuilder,
ElasticApmTracerService tracerService,
ObjectMapper objectMapper) {
this.tracerService = tracerService;
this.objectMapper = objectMapper;
this.webClient = webClientBuilder.build();
}
/**
* Make HTTP GET request with APM correlation
*/
public <T> Mono<T> getWithApmCorrelation(String url, Class<T> responseType) {
return Mono.fromCallable(() -> {
co.elastic.apm.api.Span span = tracerService.startSpan("HTTP GET", "external", "http");
try {
span.setName("GET " + url);
span.addLabel("http.url", url);
span.addLabel("http.method", "GET");
// Inject trace context into headers
Map<String, String> traceHeaders = new HashMap<>();
tracerService.injectTraceContext(traceHeaders);
// Make the HTTP call
String response = webClient.get()
.uri(url)
.headers(headers -> traceHeaders.forEach(headers::add))
.retrieve()
.bodyToMono(String.class)
.block();
span.addLabel("http.status_code", "200");
tracerService.setSpanOutcome(true);
return objectMapper.readValue(response, responseType);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw new HttpClientException("HTTP request failed: " + url, e);
} finally {
span.end();
}
});
}
/**
* Make HTTP POST request with APM correlation
*/
public <T, R> Mono<T> postWithApmCorrelation(String url, R body, Class<T> responseType) {
return Mono.fromCallable(() -> {
co.elastic.apm.api.Span span = tracerService.startSpan("HTTP POST", "external", "http");
try {
span.setName("POST " + url);
span.addLabel("http.url", url);
span.addLabel("http.method", "POST");
// Inject trace context
Map<String, String> traceHeaders = new HashMap<>();
tracerService.injectTraceContext(traceHeaders);
String response = webClient.post()
.uri(url)
.headers(headers -> traceHeaders.forEach(headers::add))
.bodyValue(body)
.retrieve()
.bodyToMono(String.class)
.block();
span.addLabel("http.status_code", "200");
tracerService.setSpanOutcome(true);
return objectMapper.readValue(response, responseType);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw new HttpClientException("HTTP POST request failed: " + url, e);
} finally {
span.end();
}
});
}
/**
* Make correlated call to another service
*/
public <T> Mono<T> callServiceWithCorrelation(String serviceName, String endpoint, 
Object request, Class<T> responseType) {
String url = buildServiceUrl(serviceName, endpoint);
co.elastic.apm.api.Span span = tracerService.startSpan("Service Call", "external", "http");
span.addLabel("service.name", serviceName);
span.addLabel("service.endpoint", endpoint);
try {
return postWithApmCorrelation(url, request, responseType)
.doOnSuccess(response -> {
span.addLabel("service.response", "success");
tracerService.setSpanOutcome(true);
})
.doOnError(error -> {
span.captureException(error);
span.addLabel("service.response", "error");
tracerService.setSpanOutcome(false);
});
} finally {
span.end();
}
}
private String buildServiceUrl(String serviceName, String endpoint) {
// In real implementation, use service discovery
Map<String, String> serviceUrls = Map.of(
"inventory-service", "http://inventory-service:8080",
"payment-service", "http://payment-service:8080",
"shipping-service", "http://shipping-service:8080"
);
String baseUrl = serviceUrls.get(serviceName);
if (baseUrl == null) {
throw new IllegalArgumentException("Unknown service: " + serviceName);
}
return baseUrl + endpoint;
}
}

2. HTTP Server Correlation Filter

@Component
@Slf4j
public class ApmCorrelationFilter implements Filter {
private final ElasticApmTracerService tracerService;
public ApmCorrelationFilter(ElasticApmTracerService tracerService) {
this.tracerService = tracerService;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Extract trace context from incoming headers
co.elastic.apm.api.Transaction transaction = extractTransactionFromHeaders(httpRequest);
if (transaction == null) {
// Start new transaction if no context found
transaction = tracerService.startTransaction(
httpRequest.getMethod() + " " + httpRequest.getRequestURI(),
"request",
null
);
}
try {
// Set transaction context
configureTransaction(transaction, httpRequest);
// Continue with the filter chain
chain.doFilter(request, response);
// Set transaction outcome based on response
setTransactionOutcome(transaction, httpResponse);
} catch (Exception e) {
transaction.captureException(e);
tracerService.setTransactionOutcome(false);
throw e;
} finally {
transaction.end();
}
}
private co.elastic.apm.api.Transaction extractTransactionFromHeaders(HttpServletRequest request) {
Map<String, String> headers = new HashMap<>();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.put(headerName, request.getHeader(headerName));
}
return tracerService.extractTraceContext(headers);
}
private void configureTransaction(co.elastic.apm.api.Transaction transaction, HttpServletRequest request) {
transaction.setName(request.getMethod() + " " + request.getRequestURI());
// Add request context as labels
transaction.addLabel("http.method", request.getMethod());
transaction.addLabel("http.url", request.getRequestURL().toString());
transaction.addLabel("http.query_string", request.getQueryString());
transaction.addLabel("http.user_agent", request.getHeader("User-Agent"));
transaction.addLabel("http.client_ip", getClientIp(request));
transaction.addLabel("service.instance.id", getInstanceId());
// Add business context if available
addBusinessContext(transaction, request);
}
private void setTransactionOutcome(co.elastic.apm.api.Transaction transaction, HttpServletResponse response) {
int status = response.getStatus();
transaction.addLabel("http.status_code", String.valueOf(status));
if (status >= 400) {
tracerService.setTransactionOutcome(false);
} else {
tracerService.setTransactionOutcome(true);
}
}
private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
return request.getRemoteAddr();
}
private String getInstanceId() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
private void addBusinessContext(co.elastic.apm.api.Transaction transaction, HttpServletRequest request) {
// Extract business context from headers or request attributes
String userId = request.getHeader("X-User-ID");
String tenantId = request.getHeader("X-Tenant-ID");
String correlationId = request.getHeader("X-Correlation-ID");
if (userId != null) {
transaction.addLabel("user.id", userId);
}
if (tenantId != null) {
transaction.addLabel("tenant.id", tenantId);
}
if (correlationId != null) {
transaction.addLabel("correlation.id", correlationId);
}
}
}
// Register the filter
@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean<ApmCorrelationFilter> apmCorrelationFilter(ElasticApmTracerService tracerService) {
FilterRegistrationBean<ApmCorrelationFilter> registrationBean = new FilterRegistrationBean<>();
registrationBean.setFilter(new ApmCorrelationFilter(tracerService));
registrationBean.addUrlPatterns("/*");
registrationBean.setOrder(1);
return registrationBean;
}
}

Database Correlation

1. JPA/Hibernate Correlation

@Component
@Slf4j
public class ApmJpaCorrelationInterceptor {
private final ElasticApmTracerService tracerService;
public ApmJpaCorrelationInterceptor(ElasticApmTracerService tracerService) {
this.tracerService = tracerService;
}
@EventListener
public void handlePreInsert(PreInsertEvent event) {
trackDatabaseOperation("INSERT", event.getEntity());
}
@EventListener
public void handlePreUpdate(PreUpdateEvent event) {
trackDatabaseOperation("UPDATE", event.getEntity());
}
@EventListener
public void handlePreDelete(PreDeleteEvent event) {
trackDatabaseOperation("DELETE", event.getEntity());
}
@EventListener
public void handlePostLoad(PostLoadEvent event) {
trackDatabaseOperation("SELECT", event.getEntity());
}
private void trackDatabaseOperation(String operation, Object entity) {
co.elastic.apm.api.Span span = tracerService.startSpan("DB " + operation, "db", "postgresql");
if (span != null) {
try {
String entityName = entity.getClass().getSimpleName();
span.setName(operation + " " + entityName);
span.addLabel("db.operation", operation);
span.addLabel("db.entity", entityName);
span.addLabel("db.system", "postgresql");
// Add entity-specific context
addEntityContext(span, entity, operation);
tracerService.setSpanOutcome(true);
} catch (Exception e) {
log.warn("Failed to track database operation", e);
tracerService.setSpanOutcome(false);
} finally {
span.end();
}
}
}
private void addEntityContext(co.elastic.apm.api.Span span, Object entity, String operation) {
if (entity instanceof Identifiable) {
String entityId = ((Identifiable) entity).getId().toString();
span.addLabel("db.entity.id", entityId);
}
if (entity instanceof TrackableEntity) {
TrackableEntity trackable = (TrackableEntity) entity;
span.addLabel("business.domain", trackable.getBusinessDomain());
span.addLabel("tenant.id", trackable.getTenantId());
}
// Add trace correlation
String traceId = tracerService.getCurrentTraceId();
if (traceId != null) {
span.addLabel("correlation.trace_id", traceId);
}
}
}
// Marker interfaces for entity tracking
public interface Identifiable {
Object getId();
}
public interface TrackableEntity {
String getBusinessDomain();
String getTenantId();
}
// Example entity
@Entity
@Table(name = "orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order implements Identifiable, TrackableEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String orderNumber;
private String customerId;
private BigDecimal totalAmount;
private String status;
private String tenantId;
private LocalDateTime createdAt;
@Override
public String getBusinessDomain() {
return "order-management";
}
}

2. JDBC Template Correlation

@Component
@Slf4j
public class ApmJdbcCorrelationTemplate {
private final JdbcTemplate jdbcTemplate;
private final ElasticApmTracerService tracerService;
public ApmJdbcCorrelationTemplate(JdbcTemplate jdbcTemplate, 
ElasticApmTracerService tracerService) {
this.jdbcTemplate = jdbcTemplate;
this.tracerService = tracerService;
}
/**
* Execute query with APM correlation
*/
public <T> List<T> queryWithCorrelation(String sql, Object[] args, RowMapper<T> rowMapper) {
co.elastic.apm.api.Span span = tracerService.startSpan("DB Query", "db", "postgresql");
try {
span.setName("SQL Query");
span.addLabel("db.statement", sql);
span.addLabel("db.system", "postgresql");
span.addLabel("db.operation", extractDbOperation(sql));
// Add trace correlation
String traceId = tracerService.getCurrentTraceId();
if (traceId != null) {
span.addLabel("correlation.trace_id", traceId);
}
// Execute the query
List<T> result = jdbcTemplate.query(sql, args, rowMapper);
span.addLabel("db.result.count", String.valueOf(result.size()));
tracerService.setSpanOutcome(true);
return result;
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw new DataAccessException("Database query failed", e) {};
} finally {
span.end();
}
}
/**
* Execute update with APM correlation
*/
public int updateWithCorrelation(String sql, Object... args) {
co.elastic.apm.api.Span span = tracerService.startSpan("DB Update", "db", "postgresql");
try {
span.setName("SQL Update");
span.addLabel("db.statement", sql);
span.addLabel("db.system", "postgresql");
span.addLabel("db.operation", extractDbOperation(sql));
String traceId = tracerService.getCurrentTraceId();
if (traceId != null) {
span.addLabel("correlation.trace_id", traceId);
}
int affectedRows = jdbcTemplate.update(sql, args);
span.addLabel("db.affected.rows", String.valueOf(affectedRows));
tracerService.setSpanOutcome(true);
return affectedRows;
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw new DataAccessException("Database update failed", e) {};
} finally {
span.end();
}
}
/**
* Execute within a database transaction span
*/
public <T> T executeInDbSpan(String operationName, Supplier<T> operation) {
co.elastic.apm.api.Span span = tracerService.startSpan("DB Transaction", "db", "postgresql");
span.setName(operationName);
try {
T result = operation.get();
tracerService.setSpanOutcome(true);
return result;
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw e;
} finally {
span.end();
}
}
private String extractDbOperation(String sql) {
if (sql == null) return "unknown";
String normalizedSql = sql.trim().toLowerCase();
if (normalizedSql.startsWith("select")) return "SELECT";
if (normalizedSql.startsWith("insert")) return "INSERT";
if (normalizedSql.startsWith("update")) return "UPDATE";
if (normalizedSql.startsWith("delete")) return "DELETE";
if (normalizedSql.startsWith("create")) return "CREATE";
if (normalizedSql.startsWith("drop")) return "DROP";
if (normalizedSql.startsWith("alter")) return "ALTER";
return "OTHER";
}
}

Message Queue Correlation

1. RabbitMQ Correlation

@Component
@Slf4j
public class ApmMessageCorrelationService {
private final ElasticApmTracerService tracerService;
private final AmqpTemplate amqpTemplate;
private final ObjectMapper objectMapper;
public ApmMessageCorrelationService(ElasticApmTracerService tracerService,
AmqpTemplate amqpTemplate,
ObjectMapper objectMapper) {
this.tracerService = tracerService;
this.amqpTemplate = amqpTemplate;
this.objectMapper = objectMapper;
}
/**
* Send message with APM correlation
*/
public void sendMessageWithCorrelation(String exchange, String routingKey, Object message) {
co.elastic.apm.api.Span span = tracerService.startSpan("Message Publish", "messaging", "rabbitmq");
try {
span.setName("Publish to " + exchange);
span.addLabel("messaging.destination", exchange);
span.addLabel("messaging.destination_kind", "topic");
span.addLabel("messaging.rabbitmq.routing_key", routingKey);
// Create message with trace context
Message amqpMessage = createCorrelatedMessage(message);
// Send message
amqpTemplate.send(exchange, routingKey, amqpMessage);
tracerService.setSpanOutcome(true);
log.debug("Sent correlated message to {}:{}", exchange, routingKey);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw new MessageSendException("Failed to send correlated message", e);
} finally {
span.end();
}
}
/**
* Create AMQP message with trace correlation
*/
private Message createCorrelatedMessage(Object payload) throws Exception {
MessageProperties properties = new MessageProperties();
// Inject trace context into message headers
Map<String, String> traceHeaders = new HashMap<>();
tracerService.injectTraceContext(traceHeaders);
traceHeaders.forEach((key, value) -> properties.setHeader(key, value));
// Add correlation metadata
properties.setHeader("x-correlation-trace-id", tracerService.getCurrentTraceId());
properties.setHeader("x-correlation-transaction-id", tracerService.getCurrentTransactionId());
properties.setHeader("x-correlation-timestamp", Instant.now().toString());
properties.setHeader("x-correlation-service", "order-service");
byte[] body = objectMapper.writeValueAsBytes(payload);
return new Message(body, properties);
}
/**
* Process message with correlation context
*/
public <T> void processMessageWithCorrelation(Message message, Class<T> payloadType, 
Function<T, Boolean> processor) {
// Extract trace context from message
co.elastic.apm.api.Transaction transaction = extractTransactionFromMessage(message);
if (transaction == null) {
transaction = tracerService.startTransaction("Message Process", "messaging", null);
}
try {
configureMessageTransaction(transaction, message);
// Extract payload
T payload = extractPayload(message, payloadType);
// Process the message
boolean success = processor.apply(payload);
if (success) {
tracerService.setTransactionOutcome(true);
log.debug("Successfully processed correlated message");
} else {
tracerService.setTransactionOutcome(false);
}
} catch (Exception e) {
transaction.captureException(e);
tracerService.setTransactionOutcome(false);
throw new MessageProcessingException("Failed to process correlated message", e);
} finally {
transaction.end();
}
}
private co.elastic.apm.api.Transaction extractTransactionFromMessage(Message message) {
MessageProperties properties = message.getMessageProperties();
Map<String, String> headers = new HashMap<>();
if (properties.getHeaders() != null) {
properties.getHeaders().forEach((key, value) -> {
if (value instanceof String) {
headers.put(key, (String) value);
}
});
}
return tracerService.extractTraceContext(headers);
}
private void configureMessageTransaction(co.elastic.apm.api.Transaction transaction, Message message) {
MessageProperties properties = message.getMessageProperties();
transaction.setName("Process from " + properties.getConsumerQueue());
transaction.addLabel("messaging.system", "rabbitmq");
transaction.addLabel("messaging.destination", properties.getReceivedExchange());
transaction.addLabel("messaging.destination_kind", "topic");
transaction.addLabel("messaging.rabbitmq.routing_key", properties.getReceivedRoutingKey());
transaction.addLabel("messaging.rabbitmq.queue", properties.getConsumerQueue());
// Add correlation context
String correlationId = (String) properties.getHeader("x-correlation-id");
if (correlationId != null) {
transaction.addLabel("correlation.id", correlationId);
}
}
private <T> T extractPayload(Message message, Class<T> payloadType) {
try {
return objectMapper.readValue(message.getBody(), payloadType);
} catch (Exception e) {
throw new MessageProcessingException("Failed to extract message payload", e);
}
}
}

Business Transaction Correlation

1. Order Processing with Correlation

@Service
@Slf4j
public class CorrelatedOrderService {
private final ElasticApmTracerService tracerService;
private final ApmJdbcCorrelationTemplate jdbcTemplate;
private final ApmCorrelatedHttpClient httpClient;
private final ApmMessageCorrelationService messageService;
public CorrelatedOrderService(ElasticApmTracerService tracerService,
ApmJdbcCorrelationTemplate jdbcTemplate,
ApmCorrelatedHttpClient httpClient,
ApmMessageCorrelationService messageService) {
this.tracerService = tracerService;
this.jdbcTemplate = jdbcTemplate;
this.httpClient = httpClient;
this.messageService = messageService;
}
/**
* Process order with full APM correlation
*/
@Transactional
public Order processOrder(CreateOrderRequest request) {
co.elastic.apm.api.Transaction transaction = tracerService.getCurrentTransaction();
try {
// Add business context to transaction
addBusinessContext(transaction, request);
// Step 1: Validate order
validateOrder(request);
// Step 2: Check inventory
checkInventory(request);
// Step 3: Process payment
processPayment(request);
// Step 4: Create order record
Order order = createOrderRecord(request);
// Step 5: Send notifications
sendOrderNotifications(order);
tracerService.setTransactionOutcome(true);
log.info("Successfully processed order: {}", order.getOrderNumber());
return order;
} catch (Exception e) {
transaction.captureException(e);
tracerService.setTransactionOutcome(false);
log.error("Order processing failed", e);
throw new OrderProcessingException("Order processing failed", e);
}
}
private void validateOrder(CreateOrderRequest request) {
co.elastic.apm.api.Span span = tracerService.startSpan("Validate Order", "business", "validation");
try {
span.addLabel("order.customer_id", request.getCustomerId());
span.addLabel("order.item_count", String.valueOf(request.getItems().size()));
// Validation logic
if (request.getItems().isEmpty()) {
throw new ValidationException("Order must contain at least one item");
}
if (request.getTotalAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new ValidationException("Order total must be positive");
}
tracerService.setSpanOutcome(true);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw e;
} finally {
span.end();
}
}
private void checkInventory(CreateOrderRequest request) {
co.elastic.apm.api.Span span = tracerService.startSpan("Check Inventory", "business", "inventory");
try {
span.addLabel("inventory.check.items", String.valueOf(request.getItems().size()));
// Call inventory service with correlation
InventoryCheckRequest inventoryRequest = createInventoryRequest(request);
InventoryCheckResponse response = httpClient.callServiceWithCorrelation(
"inventory-service", "/api/inventory/check", inventoryRequest, InventoryCheckResponse.class)
.block();
if (!response.isAvailable()) {
throw new InventoryException("Insufficient inventory for order");
}
span.addLabel("inventory.available", "true");
tracerService.setSpanOutcome(true);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw e;
} finally {
span.end();
}
}
private void processPayment(CreateOrderRequest request) {
co.elastic.apm.api.Span span = tracerService.startSpan("Process Payment", "business", "payment");
try {
span.addLabel("payment.amount", request.getTotalAmount().toString());
span.addLabel("payment.currency", request.getCurrency());
// Call payment service with correlation
PaymentRequest paymentRequest = createPaymentRequest(request);
PaymentResponse response = httpClient.callServiceWithCorrelation(
"payment-service", "/api/payments/process", paymentRequest, PaymentResponse.class)
.block();
if (!response.isSuccess()) {
throw new PaymentException("Payment processing failed: " + response.getErrorMessage());
}
span.addLabel("payment.transaction_id", response.getTransactionId());
tracerService.setSpanOutcome(true);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
throw e;
} finally {
span.end();
}
}
private Order createOrderRecord(CreateOrderRequest request) {
return jdbcTemplate.executeInDbSpan("Create Order Record", () -> {
String orderNumber = generateOrderNumber();
Order order = Order.builder()
.orderNumber(orderNumber)
.customerId(request.getCustomerId())
.totalAmount(request.getTotalAmount())
.status("CONFIRMED")
.tenantId(request.getTenantId())
.createdAt(LocalDateTime.now())
.build();
// Save order
String sql = "INSERT INTO orders (order_number, customer_id, total_amount, status, tenant_id, created_at) VALUES (?, ?, ?, ?, ?, ?)";
jdbcTemplate.updateWithCorrelation(sql, 
order.getOrderNumber(),
order.getCustomerId(),
order.getTotalAmount(),
order.getStatus(),
order.getTenantId(),
order.getCreatedAt()
);
tracerService.addLabel("order.number", orderNumber);
return order;
});
}
private void sendOrderNotifications(Order order) {
co.elastic.apm.api.Span span = tracerService.startSpan("Send Notifications", "business", "messaging");
try {
// Send order confirmation message
OrderConfirmedEvent event = OrderConfirmedEvent.builder()
.orderId(order.getId())
.orderNumber(order.getOrderNumber())
.customerId(order.getCustomerId())
.totalAmount(order.getTotalAmount())
.confirmedAt(LocalDateTime.now())
.build();
messageService.sendMessageWithCorrelation(
"order-events", 
"order.confirmed", 
event
);
span.addLabel("notification.sent", "true");
tracerService.setSpanOutcome(true);
} catch (Exception e) {
span.captureException(e);
tracerService.setSpanOutcome(false);
log.warn("Failed to send order notifications", e);
// Don't throw - notifications are non-critical
} finally {
span.end();
}
}
private void addBusinessContext(co.elastic.apm.api.Transaction transaction, CreateOrderRequest request) {
transaction.addLabel("business.process", "order-processing");
transaction.addLabel("business.customer_id", request.getCustomerId());
transaction.addLabel("business.tenant_id", request.getTenantId());
transaction.addLabel("business.order_amount", request.getTotalAmount().toString());
transaction.addLabel("business.currency", request.getCurrency());
}
private String generateOrderNumber() {
return "ORD-" + System.currentTimeMillis() + "-" + 
ThreadLocalRandom.current().nextInt(1000, 9999);
}
private InventoryCheckRequest createInventoryRequest(CreateOrderRequest orderRequest) {
return InventoryCheckRequest.builder()
.items(orderRequest.getItems())
.tenantId(orderRequest.getTenantId())
.build();
}
private PaymentRequest createPaymentRequest(CreateOrderRequest orderRequest) {
return PaymentRequest.builder()
.amount(orderRequest.getTotalAmount())
.currency(orderRequest.getCurrency())
.customerId(orderRequest.getCustomerId())
.paymentMethod(orderRequest.getPaymentMethod())
.build();
}
}

Custom Correlation Service

1. Correlation ID Service

@Component
@Slf4j
public class CorrelationIdService {
private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
private static final String CORRELATION_ID_LABEL = "correlation.id";
private final ElasticApmTracerService tracerService;
public CorrelationIdService(ElasticApmTracerService tracerService) {
this.tracerService = tracerService;
}
/**
* Generate new correlation ID
*/
public String generateCorrelationId() {
return "corr-" + UUID.randomUUID().toString();
}
/**
* Get current correlation ID from transaction
*/
public String getCurrentCorrelationId() {
// In real implementation, you might store this in a ThreadLocal
// or extract from the current transaction context
co.elastic.apm.api.Transaction transaction = tracerService.getCurrentTransaction();
if (transaction != null) {
// This would require custom implementation to get labels
// For demo purposes, we'll generate a new one
return generateCorrelationId();
}
return generateCorrelationId();
}
/**
* Set correlation ID in current transaction
*/
public void setCorrelationId(String correlationId) {
tracerService.addLabel(CORRELATION_ID_LABEL, correlationId);
}
/**
* Inject correlation ID into headers
*/
public void injectCorrelationId(Map<String, String> headers) {
String correlationId = getCurrentCorrelationId();
headers.put(CORRELATION_ID_HEADER, correlationId);
// Also ensure it's set in the transaction
setCorrelationId(correlationId);
}
/**
* Extract correlation ID from headers
*/
public String extractCorrelationId(Map<String, String> headers) {
String correlationId = headers.get(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.isEmpty()) {
correlationId = generateCorrelationId();
}
setCorrelationId(correlationId);
return correlationId;
}
/**
* Create correlated business transaction
*/
public CorrelatedTransaction startCorrelatedTransaction(String name, String type, Map<String, String> context) {
co.elastic.apm.api.Transaction transaction = tracerService.startTransaction(name, type, null);
String correlationId = getCurrentCorrelationId();
// Add context labels
if (context != null) {
context.forEach(tracerService::addLabel);
}
tracerService.addLabel(CORRELATION_ID_LABEL, correlationId);
return CorrelatedTransaction.builder()
.transaction(transaction)
.correlationId(correlationId)
.traceId(tracerService.getCurrentTraceId())
.build();
}
@Data
@Builder
public static class CorrelatedTransaction {
private co.elastic.apm.api.Transaction transaction;
private String correlationId;
private String traceId;
public void end(boolean success) {
if (transaction != null) {
transaction.setOutcome(success ? "success" : "failure");
transaction.end();
}
}
}
}

2. Business Context Propagator

@Component
@Slf4j
public class BusinessContextPropagator {
private final ElasticApmTracerService tracerService;
private final CorrelationIdService correlationIdService;
public BusinessContextPropagator(ElasticApmTracerService tracerService,
CorrelationIdService correlationIdService) {
this.tracerService = tracerService;
this.correlationIdService = correlationIdService;
}
/**
* Add user context to current transaction
*/
public void addUserContext(String userId, String sessionId, String userRole) {
tracerService.addLabel("user.id", userId);
tracerService.addLabel("user.session_id", sessionId);
tracerService.addLabel("user.role", userRole);
log.debug("Added user context: userId={}, role={}", userId, userRole);
}
/**
* Add tenant context to current transaction
*/
public void addTenantContext(String tenantId, String tenantName, String environment) {
tracerService.addLabel("tenant.id", tenantId);
tracerService.addLabel("tenant.name", tenantName);
tracerService.addLabel("tenant.environment", environment);
log.debug("Added tenant context: tenantId={}, environment={}", tenantId, environment);
}
/**
* Add business process context
*/
public void addBusinessProcessContext(String processName, String processId, String step) {
tracerService.addLabel("business.process", processName);
tracerService.addLabel("business.process_id", processId);
tracerService.addLabel("business.process_step", step);
log.debug("Added business process context: process={}, step={}", processName, step);
}
/**
* Add financial context
*/
public void addFinancialContext(String currency, BigDecimal amount, String transactionType) {
tracerService.addLabel("financial.currency", currency);
tracerService.addLabel("financial.amount", amount.toString());
tracerService.addLabel("financial.transaction_type", transactionType);
}
/**
* Propagate business context to outgoing requests
*/
public void propagateBusinessContext(Map<String, String> headers) {
// Inject correlation ID
correlationIdService.injectCorrelationId(headers);
// Add business context headers
co.elastic.apm.api.Transaction transaction = tracerService.getCurrentTransaction();
if (transaction != null) {
// In real implementation, you would extract labels from transaction
// and add them as headers
headers.put("X-Business-Context", "active");
}
log.debug("Propagated business context to headers");
}
/**
* Extract business context from incoming requests
*/
public void extractBusinessContext(Map<String, String> headers) {
// Extract correlation ID
String correlationId = correlationIdService.extractCorrelationId(headers);
// Extract other business context from headers
String userId = headers.get("X-User-ID");
String tenantId = headers.get("X-Tenant-ID");
String userRole = headers.get("X-User-Role");
if (userId != null) {
addUserContext(userId, headers.get("X-Session-ID"), userRole);
}
if (tenantId != null) {
addTenantContext(tenantId, headers.get("X-Tenant-Name"), headers.get("X-Environment"));
}
log.debug("Extracted business context: correlationId={}, userId={}", correlationId, userId);
}
/**
* Create business transaction with full context
*/
public CorrelationIdService.CorrelatedTransaction startBusinessTransaction(
String name, BusinessContext context) {
Map<String, String> labels = new HashMap<>();
labels.put("business.domain", context.getBusinessDomain());
labels.put("business.operation", context.getOperation());
labels.put("business.criticality", context.getCriticality());
if (context.getUserId() != null) {
labels.put("user.id", context.getUserId());
}
if (context.getTenantId() != null) {
labels.put("tenant.id", context.getTenantId());
}
return correlationIdService.startCorrelatedTransaction(name, "business", labels);
}
@Data
@Builder
public static class BusinessContext {
private String businessDomain;
private String operation;
private String criticality; // LOW, MEDIUM, HIGH, CRITICAL
private String userId;
private String tenantId;
private String sessionId;
private String userRole;
}
}

REST Controller with APM Correlation

@RestController
@RequestMapping("/api/orders")
@Slf4j
public class CorrelatedOrderController {
private final CorrelatedOrderService orderService;
private final ElasticApmTracerService tracerService;
private final BusinessContextPropagator contextPropagator;
private final CorrelationIdService correlationIdService;
public CorrelatedOrderController(CorrelatedOrderService orderService,
ElasticApmTracerService tracerService,
BusinessContextPropagator contextPropagator,
CorrelationIdService correlationIdService) {
this.orderService = orderService;
this.tracerService = tracerService;
this.contextPropagator = contextPropagator;
this.correlationIdService = correlationIdService;
}
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request,
HttpServletRequest httpRequest) {
// Extract business context from headers
Map<String, String> headers = extractHeaders(httpRequest);
contextPropagator.extractBusinessContext(headers);
CorrelationIdService.CorrelatedTransaction transaction = 
contextPropagator.startBusinessTransaction(
"Create Order",
BusinessContextPropagator.BusinessContext.builder()
.businessDomain("order-management")
.operation("create-order")
.criticality("HIGH")
.userId(request.getCustomerId())
.tenantId(request.getTenantId())
.build()
);
try {
// Process the order
Order order = orderService.processOrder(request);
OrderResponse response = OrderResponse.builder()
.orderId(order.getId())
.orderNumber(order.getOrderNumber())
.status(order.getStatus())
.correlationId(transaction.getCorrelationId())
.traceId(transaction.getTraceId())
.message("Order created successfully")
.build();
transaction.end(true);
return ResponseEntity.ok(response);
} catch (Exception e) {
transaction.end(false);
throw e;
}
}
@GetMapping("/{orderId}")
public ResponseEntity<OrderResponse> getOrder(@PathVariable Long orderId,
HttpServletRequest httpRequest) {
// Extract business context
Map<String, String> headers = extractHeaders(httpRequest);
contextPropagator.extractBusinessContext(headers);
CorrelationIdService.CorrelatedTransaction transaction = 
contextPropagator.startBusinessTransaction(
"Get Order",
BusinessContextPropagator.BusinessContext.builder()
.businessDomain("order-management")
.operation("get-order")
.criticality("LOW")
.build()
);
try {
// In real implementation, fetch order from service
Order order = findOrderById(orderId);
if (order == null) {
transaction.end(false);
return ResponseEntity.notFound().build();
}
OrderResponse response = OrderResponse.builder()
.orderId(order.getId())
.orderNumber(order.getOrderNumber())
.status(order.getStatus())
.correlationId(transaction.getCorrelationId())
.traceId(transaction.getTraceId())
.build();
transaction.end(true);
return ResponseEntity.ok(response);
} catch (Exception e) {
transaction.end(false);
throw e;
}
}
@GetMapping("/correlation/current")
public ResponseEntity<CorrelationInfo> getCurrentCorrelation() {
CorrelationInfo info = CorrelationInfo.builder()
.correlationId(correlationIdService.getCurrentCorrelationId())
.traceId(tracerService.getCurrentTraceId())
.transactionId(tracerService.getCurrentTransactionId())
.serviceName("order-service")
.timestamp(Instant.now())
.build();
return ResponseEntity.ok(info);
}
private Map<String, String> extractHeaders(HttpServletRequest request) {
Map<String, String> headers = new HashMap<>();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.put(headerName, request.getHeader(headerName));
}
return headers;
}
private Order findOrderById(Long orderId) {
// Mock implementation
return Order.builder()
.id(orderId)
.orderNumber("ORD-12345")
.status("CONFIRMED")
.build();
}
@Data
@Builder
public static class OrderResponse {
private Long orderId;
private String orderNumber;
private String status;
private String correlationId;
private String traceId;
private String message;
}
@Data
@Builder
public static class CorrelationInfo {
private String correlationId;
private String traceId;
private String transactionId;
private String serviceName;
private Instant timestamp;
}
}

Testing APM Correlation

1. Correlation Test Suite

@SpringBootTest
@ActiveProfiles("test")
class ElasticApmCorrelationTest {
@Autowired
private ElasticApmTracerService tracerService;
@Autowired
private CorrelationIdService correlationIdService;
@Autowired
private BusinessContextPropagator contextPropagator;
@MockBean
private ApmCorrelatedHttpClient httpClient;
@Test
void testCorrelationIdGeneration() {
// Given
String correlationId = correlationIdService.generateCorrelationId();
// When
correlationIdService.setCorrelationId(correlationId);
String retrievedId = correlationIdService.getCurrentCorrelationId();
// Then
assertNotNull(correlationId);
assertTrue(correlationId.startsWith("corr-"));
assertEquals(correlationId, retrievedId);
}
@Test
void testTraceContextInjection() {
// Given
Map<String, String> headers = new HashMap<>();
// When
tracerService.injectTraceContext(headers);
// Then
assertNotNull(headers.get("traceparent"));
assertNotNull(headers.get("tracestate"));
}
@Test
void testBusinessContextPropagation() {
// Given
Map<String, String> headers = new HashMap<>();
// When
contextPropagator.addUserContext("user-123", "session-456", "ADMIN");
contextPropagator.addTenantContext("tenant-789", "Acme Corp", "production");
contextPropagator.propagateBusinessContext(headers);
// Then
assertNotNull(headers.get("X-Correlation-ID"));
assertEquals("active", headers.get("X-Business-Context"));
}
@Test
void testCorrelatedTransaction() {
// Given
BusinessContextPropagator.BusinessContext context = 
BusinessContextPropagator.BusinessContext.builder()
.businessDomain("order-management")
.operation("test-operation")
.criticality("HIGH")
.userId("test-user")
.build();
// When
CorrelationIdService.CorrelatedTransaction transaction = 
contextPropagator.startBusinessTransaction("Test Transaction", context);
// Then
assertNotNull(transaction);
assertNotNull(transaction.getCorrelationId());
assertNotNull(transaction.getTraceId());
// Cleanup
transaction.end(true);
}
}

Monitoring and Metrics

1. APM Metrics Collector

@Component
@Slf4j
public class ApmMetricsCollector {
private final MeterRegistry meterRegistry;
private final ElasticApmTracerService tracerService;
public ApmMetricsCollector(MeterRegistry meterRegistry, 
ElasticApmTracerService tracerService) {
this.meterRegistry = meterRegistry;
this.tracerService = tracerService;
}
/**
* Record business transaction metrics
*/
public void recordBusinessTransaction(String transactionName, boolean success, Duration duration) {
Counter.builder("business.transactions")
.tag("name", transactionName)
.tag("outcome", success ? "success" : "failure")
.register(meterRegistry)
.increment();
Timer.builder("business.transaction.duration")
.tag("name", transactionName)
.register(meterRegistry)
.record(duration);
log.debug("Recorded business transaction: {}, success: {}, duration: {}ms", 
transactionName, success, duration.toMillis());
}
/**
* Record external service call metrics
*/
public void recordExternalServiceCall(String serviceName, String endpoint, 
boolean success, Duration duration) {
Counter.builder("external.service.calls")
.tag("service", serviceName)
.tag("endpoint", endpoint)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment();
Timer.builder("external.service.duration")
.tag("service", serviceName)
.tag("endpoint", endpoint)
.register(meterRegistry)
.record(duration);
}
/**
* Record database operation metrics
*/
public void recordDatabaseOperation(String operation, String table, 
boolean success, Duration duration) {
Counter.builder("database.operations")
.tag("operation", operation)
.tag("table", table)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment();
Timer.builder("database.operation.duration")
.tag("operation", operation)
.tag("table", table)
.register(meterRegistry)
.record(duration);
}
/**
* Record correlation metrics
*/
public void recordCorrelationEvent(String eventType, String source, String target) {
Counter.builder("correlation.events")
.tag("event_type", eventType)
.tag("source", source)
.tag("target", target)
.register(meterRegistry)
.increment();
log.debug("Recorded correlation event: {} from {} to {}", eventType, source, target);
}
}

Conclusion

Elastic APM correlation provides powerful observability capabilities for Java applications. By properly correlating traces across services, databases, and message queues, you gain comprehensive visibility into your application's behavior and performance.

Key Implementation Patterns:

  • HTTP Correlation: Automatic trace context propagation for REST APIs
  • Database Correlation: Span creation for SQL operations and JPA events
  • Message Queue Correlation: Trace context propagation through messaging systems
  • Business Transaction Correlation: Custom spans for business processes
  • Context Propagation: Business context and correlation ID management

Best Practices:

  • Consistent Naming: Use meaningful names for transactions and spans
  • Context Enrichment: Add business context to traces for better debugging
  • Performance Awareness: Be mindful of span creation overhead
  • Error Handling: Properly capture and correlate exceptions
  • Testing: Comprehensive testing of correlation across service boundaries

Use Cases:

  • Microservices Architecture: End-to-end request tracing
  • Complex Business Processes: Correlation of multi-step transactions
  • Performance Monitoring: Identifying bottlenecks across services
  • Error Analysis: Correlating errors with specific traces and contexts
  • Capacity Planning: Understanding service dependencies and load patterns

By implementing comprehensive Elastic APM correlation, you can achieve full-stack observability, enabling effective monitoring, debugging, and optimization of your distributed Java applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper