CloudEvents is a specification for describing event data in a common way, providing interoperability across services, platforms, and systems. The CloudEvents SDK for Java helps create, send, receive, and process CloudEvents.
Core Concepts
What are CloudEvents?
- Standardized format for event data
- Platform-agnostic event description
- Enables interoperability between event producers and consumers
- Supports multiple transport protocols (HTTP, Kafka, AMQP, etc.)
CloudEvents Structure:
- Required Attributes: specversion, id, source, type
- Optional Attributes: datacontenttype, dataschema, subject, time, data
- Extensions: Custom attributes for specific use cases
Dependencies and Setup
1. Maven Dependencies
<properties>
<cloudevents.version>3.0.0</cloudevents.version>
<jackson.version>2.15.2</jackson.version>
<spring-boot.version>3.1.0</spring-boot.version>
<kafka.version>3.4.0</kafka.version>
</properties>
<dependencies>
<!-- CloudEvents SDK -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- CloudEvents HTTP Transport -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-vertx</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- CloudEvents Kafka Transport -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- CloudEvents Jackson JSON Binding -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring-boot.version}</version>
<optional>true</optional>
</dependency>
<!-- Kafka (Optional) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
Core Implementation
1. CloudEvents Factory and Builder
package com.example.cloudevents.core;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import io.cloudevents.jackson.PojoCloudEventDataMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;
public class CloudEventFactory {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventFactory.class);
private final String eventSource;
private final ObjectMapper objectMapper;
public CloudEventFactory(String eventSource, ObjectMapper objectMapper) {
this.eventSource = eventSource;
this.objectMapper = objectMapper;
}
public CloudEvent createEvent(String eventType, Object data) {
return createEvent(eventType, data, null, null);
}
public CloudEvent createEvent(String eventType, Object data, String subject) {
return createEvent(eventType, data, subject, null);
}
public CloudEvent createEvent(String eventType, Object data, String subject,
OffsetDateTime eventTime) {
LOG.debug("Creating CloudEvent of type: {} from source: {}", eventType, eventSource);
try {
CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(generateEventId())
.withSource(URI.create(eventSource))
.withType(eventType)
.withDataContentType("application/json");
if (subject != null) {
builder.withSubject(subject);
}
if (eventTime != null) {
builder.withTime(eventTime);
} else {
builder.withTime(OffsetDateTime.now());
}
// Add data
if (data != null) {
PojoCloudEventData<Object> eventData = PojoCloudEventData.wrap(
data,
objectMapper::writeValueAsBytes
);
builder.withData(eventData);
}
return builder.build();
} catch (Exception e) {
LOG.error("Failed to create CloudEvent: {}", e.getMessage(), e);
throw new CloudEventException("Failed to create CloudEvent", e);
}
}
public <T> T extractData(CloudEvent cloudEvent, Class<T> dataType) {
try {
if (cloudEvent.getData() == null) {
return null;
}
PojoCloudEventData<T> eventData = PojoCloudEventDataMapper.from(
objectMapper,
dataType
).map(cloudEvent.getData());
return eventData.getValue();
} catch (Exception e) {
LOG.error("Failed to extract data from CloudEvent: {}", e.getMessage(), e);
throw new CloudEventException("Failed to extract event data", e);
}
}
public EventMetadata extractMetadata(CloudEvent cloudEvent) {
return EventMetadata.builder()
.id(cloudEvent.getId())
.source(cloudEvent.getSource().toString())
.type(cloudEvent.getType())
.subject(cloudEvent.getSubject())
.time(cloudEvent.getTime())
.dataContentType(cloudEvent.getDataContentType())
.build();
}
private String generateEventId() {
return UUID.randomUUID().toString();
}
// Domain-specific event creators
public CloudEvent createUserCreatedEvent(UserCreatedData data) {
return createEvent("com.example.user.created", data, data.getUserId());
}
public CloudEvent createUserUpdatedEvent(UserUpdatedData data) {
return createEvent("com.example.user.updated", data, data.getUserId());
}
public CloudEvent createOrderPlacedEvent(OrderPlacedData data) {
return createEvent("com.example.order.placed", data, data.getOrderId());
}
public CloudEvent createPaymentProcessedEvent(PaymentProcessedData data) {
return createEvent("com.example.payment.processed", data, data.getPaymentId());
}
}
class CloudEventException extends RuntimeException {
public CloudEventException(String message) {
super(message);
}
public CloudEventException(String message, Throwable cause) {
super(message, cause);
}
}
2. Event Data Models
package com.example.cloudevents.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.LocalDateTime;
import java.util.Map;
// Base event data class
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class BaseEventData {
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
private LocalDateTime timestamp;
private String correlationId;
private Map<String, Object> metadata;
public BaseEventData() {
this.timestamp = LocalDateTime.now();
}
// Getters and setters
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
public String getCorrelationId() { return correlationId; }
public void setCorrelationId(String correlationId) { this.correlationId = correlationId; }
public Map<String, Object> getMetadata() { return metadata; }
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
}
// Domain-specific event data models
public class UserCreatedData extends BaseEventData {
private String userId;
private String email;
private String firstName;
private String lastName;
private UserStatus status;
// Constructors
public UserCreatedData() {}
public UserCreatedData(String userId, String email, String firstName, String lastName) {
this.userId = userId;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.status = UserStatus.ACTIVE;
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public UserStatus getStatus() { return status; }
public void setStatus(UserStatus status) { this.status = status; }
}
public class UserUpdatedData extends BaseEventData {
private String userId;
private String email;
private String firstName;
private String lastName;
private UserStatus previousStatus;
private UserStatus newStatus;
private Map<String, Object> updatedFields;
// Constructors, getters, and setters
public UserUpdatedData() {}
public UserUpdatedData(String userId, Map<String, Object> updatedFields) {
this.userId = userId;
this.updatedFields = updatedFields;
}
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public UserStatus getPreviousStatus() { return previousStatus; }
public void setPreviousStatus(UserStatus previousStatus) { this.previousStatus = previousStatus; }
public UserStatus getNewStatus() { return newStatus; }
public void setNewStatus(UserStatus newStatus) { this.newStatus = newStatus; }
public Map<String, Object> getUpdatedFields() { return updatedFields; }
public void setUpdatedFields(Map<String, Object> updatedFields) { this.updatedFields = updatedFields; }
}
public class OrderPlacedData extends BaseEventData {
private String orderId;
private String customerId;
private BigDecimal totalAmount;
private List<OrderItem> items;
private String currency;
private ShippingAddress shippingAddress;
// Constructors, getters, and setters
public OrderPlacedData() {}
public OrderPlacedData(String orderId, String customerId, BigDecimal totalAmount) {
this.orderId = orderId;
this.customerId = customerId;
this.totalAmount = totalAmount;
this.currency = "USD";
}
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }
public List<OrderItem> getItems() { return items; }
public void setItems(List<OrderItem> items) { this.items = items; }
public String getCurrency() { return currency; }
public void setCurrency(String currency) { this.currency = currency; }
public ShippingAddress getShippingAddress() { return shippingAddress; }
public void setShippingAddress(ShippingAddress shippingAddress) { this.shippingAddress = shippingAddress; }
}
public class PaymentProcessedData extends BaseEventData {
private String paymentId;
private String orderId;
private BigDecimal amount;
private String currency;
private PaymentStatus status;
private String gatewayTransactionId;
private LocalDateTime processedAt;
// Constructors, getters, and setters
public PaymentProcessedData() {}
public PaymentProcessedData(String paymentId, String orderId, BigDecimal amount, PaymentStatus status) {
this.paymentId = paymentId;
this.orderId = orderId;
this.amount = amount;
this.status = status;
this.currency = "USD";
this.processedAt = LocalDateTime.now();
}
public String getPaymentId() { return paymentId; }
public void setPaymentId(String paymentId) { this.paymentId = paymentId; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public BigDecimal getAmount() { return amount; }
public void setAmount(BigDecimal amount) { this.amount = amount; }
public String getCurrency() { return currency; }
public void setCurrency(String currency) { this.currency = currency; }
public PaymentStatus getStatus() { return status; }
public void setStatus(PaymentStatus status) { this.status = status; }
public String getGatewayTransactionId() { return gatewayTransactionId; }
public void setGatewayTransactionId(String gatewayTransactionId) { this.gatewayTransactionId = gatewayTransactionId; }
public LocalDateTime getProcessedAt() { return processedAt; }
public void setProcessedAt(LocalDateTime processedAt) { this.processedAt = processedAt; }
}
// Supporting models
public class OrderItem {
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
// Constructors, getters, and setters
public OrderItem() {}
public OrderItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
public String getProductId() { return productId; }
public void setProductId(String productId) { this.productId = productId; }
public String getProductName() { return productName; }
public void setProductName(String productName) { this.productName = productName; }
public int getQuantity() { return quantity; }
public void setQuantity(int quantity) { this.quantity = quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
public void setUnitPrice(BigDecimal unitPrice) { this.unitPrice = unitPrice; }
}
public class ShippingAddress {
private String street;
private String city;
private String state;
private String zipCode;
private String country;
// Constructors, getters, and setters
public ShippingAddress() {}
public ShippingAddress(String street, String city, String state, String zipCode, String country) {
this.street = street;
this.city = city;
this.state = state;
this.zipCode = zipCode;
this.country = country;
}
public String getStreet() { return street; }
public void setStreet(String street) { this.street = street; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
public String getState() { return state; }
public void setState(String state) { this.state = state; }
public String getZipCode() { return zipCode; }
public void setZipCode(String zipCode) { this.zipCode = zipCode; }
public String getCountry() { return country; }
public void setCountry(String country) { this.country = country; }
}
enum UserStatus {
ACTIVE, INACTIVE, SUSPENDED, PENDING_VERIFICATION
}
enum PaymentStatus {
SUCCEEDED, FAILED, PENDING, REFUNDED, CANCELLED
}
3. Event Metadata and Validation
package com.example.cloudevents.model;
import java.time.OffsetDateTime;
public class EventMetadata {
private final String id;
private final String source;
private final String type;
private final String subject;
private final OffsetDateTime time;
private final String dataContentType;
private EventMetadata(Builder builder) {
this.id = builder.id;
this.source = builder.source;
this.type = builder.type;
this.subject = builder.subject;
this.time = builder.time;
this.dataContentType = builder.dataContentType;
}
// Getters
public String getId() { return id; }
public String getSource() { return source; }
public String getType() { return type; }
public String getSubject() { return subject; }
public OffsetDateTime getTime() { return time; }
public String getDataContentType() { return dataContentType; }
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String id;
private String source;
private String type;
private String subject;
private OffsetDateTime time;
private String dataContentType;
public Builder id(String id) { this.id = id; return this; }
public Builder source(String source) { this.source = source; return this; }
public Builder type(String type) { this.type = type; return this; }
public Builder subject(String subject) { this.subject = subject; return this; }
public Builder time(OffsetDateTime time) { this.time = time; return this; }
public Builder dataContentType(String dataContentType) { this.dataContentType = dataContentType; return this; }
public EventMetadata build() {
return new EventMetadata(this);
}
}
}
package com.example.cloudevents.core;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
public class CloudEventValidator {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventValidator.class);
public ValidationResult validate(CloudEvent cloudEvent) {
List<String> errors = new ArrayList<>();
List<String> warnings = new ArrayList<>();
// Required field validation
if (cloudEvent.getId() == null || cloudEvent.getId().trim().isEmpty()) {
errors.add("Event ID is required");
}
if (cloudEvent.getSource() == null) {
errors.add("Event source is required");
} else {
try {
URI sourceUri = cloudEvent.getSource();
if (!sourceUri.isAbsolute()) {
warnings.add("Event source should be an absolute URI");
}
} catch (Exception e) {
errors.add("Event source is not a valid URI: " + e.getMessage());
}
}
if (cloudEvent.getType() == null || cloudEvent.getType().trim().isEmpty()) {
errors.add("Event type is required");
}
if (cloudEvent.getSpecVersion() == null) {
errors.add("Spec version is required");
}
// Time validation
if (cloudEvent.getTime() != null) {
OffsetDateTime eventTime = cloudEvent.getTime();
OffsetDateTime now = OffsetDateTime.now();
if (eventTime.isAfter(now.plusMinutes(5))) {
warnings.add("Event time is in the future");
}
if (eventTime.isBefore(now.minusDays(7))) {
warnings.add("Event time is more than 7 days in the past");
}
}
// Data validation
if (cloudEvent.getData() != null && cloudEvent.getDataContentType() == null) {
warnings.add("Data content type should be specified when data is present");
}
boolean isValid = errors.isEmpty();
String message = isValid ? "Validation successful" : "Validation failed with " + errors.size() + " errors";
return ValidationResult.builder()
.valid(isValid)
.message(message)
.errors(errors)
.warnings(warnings)
.build();
}
public boolean isValid(CloudEvent cloudEvent) {
return validate(cloudEvent).isValid();
}
}
4. HTTP Transport Implementation
package com.example.cloudevents.transport.http;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.core.message.MessageWriter;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class CloudEventHttpClient {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventHttpClient.class);
private final WebClient webClient;
private final String baseUrl;
public CloudEventHttpClient(Vertx vertx, String baseUrl) {
this.webClient = WebClient.create(vertx);
this.baseUrl = baseUrl;
}
public CompletableFuture<HttpResponse> sendEvent(CloudEvent cloudEvent, String path) {
return sendEvent(cloudEvent, path, Map.of());
}
public CompletableFuture<HttpResponse> sendEvent(CloudEvent cloudEvent, String path,
Map<String, String> headers) {
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
try {
HttpRequest<Buffer> request = webClient.postAbs(baseUrl + path);
// Add custom headers
headers.forEach(request::putHeader);
// Write CloudEvent to HTTP request
VertxMessageFactory.createWriter(request)
.writeBinary(cloudEvent)
.onSuccess(response -> {
LOG.debug("CloudEvent sent successfully to {}: {}", path, response.statusCode());
future.complete(new HttpResponse(response.statusCode(), response.bodyAsString()));
})
.onFailure(error -> {
LOG.error("Failed to send CloudEvent to {}: {}", path, error.getMessage());
future.completeExceptionally(new CloudEventTransportException(
"Failed to send CloudEvent", error));
});
} catch (Exception e) {
LOG.error("Error preparing CloudEvent HTTP request: {}", e.getMessage());
future.completeExceptionally(e);
}
return future;
}
public CloudEvent receiveEvent(io.vertx.core.http.HttpServerRequest request) {
try {
return VertxMessageFactory.createReader(request)
.toEvent()
.orElseThrow(() -> new CloudEventTransportException("Failed to read CloudEvent from request"));
} catch (Exception e) {
LOG.error("Failed to receive CloudEvent: {}", e.getMessage());
throw new CloudEventTransportException("Failed to receive CloudEvent", e);
}
}
}
class CloudEventTransportException extends RuntimeException {
public CloudEventTransportException(String message) {
super(message);
}
public CloudEventTransportException(String message, Throwable cause) {
super(message, cause);
}
}
class HttpResponse {
private final int statusCode;
private final String body;
public HttpResponse(int statusCode, String body) {
this.statusCode = statusCode;
this.body = body;
}
public int getStatusCode() { return statusCode; }
public String getBody() { return body; }
public boolean isSuccess() {
return statusCode >= 200 && statusCode < 300;
}
}
5. Kafka Transport Implementation
package com.example.cloudevents.transport.kafka;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.kafka.CloudEventDeserializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;
public class CloudEventKafkaTransport {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventKafkaTransport.class);
private final KafkaProducer<String, CloudEvent> producer;
private final KafkaConsumer<String, CloudEvent> consumer;
private final String topic;
public CloudEventKafkaTransport(String bootstrapServers, String topic) {
this.topic = topic;
// Producer configuration
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer", StringSerializer.class.getName());
producerProps.put("value.serializer", CloudEventSerializer.class.getName());
producerProps.put("acks", "all");
producerProps.put("retries", 3);
this.producer = new KafkaProducer<>(producerProps);
// Consumer configuration
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "cloudevents-consumer");
consumerProps.put("key.deserializer", StringDeserializer.class.getName());
consumerProps.put("value.deserializer", CloudEventDeserializer.class.getName());
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("enable.auto.commit", "false");
this.consumer = new KafkaConsumer<>(consumerProps);
}
public Future<Void> sendEvent(CloudEvent cloudEvent) {
return sendEvent(cloudEvent, null);
}
public Future<Void> sendEvent(CloudEvent cloudEvent, String key) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
ProducerRecord<String, CloudEvent> record = new ProducerRecord<>(
topic, key, cloudEvent
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
LOG.error("Failed to send CloudEvent to Kafka: {}", exception.getMessage());
future.completeExceptionally(
new CloudEventTransportException("Kafka send failed", exception));
} else {
LOG.debug("CloudEvent sent to Kafka topic {} [partition: {}, offset: {}]",
metadata.topic(), metadata.partition(), metadata.offset());
future.complete(null);
}
});
} catch (Exception e) {
LOG.error("Error sending CloudEvent to Kafka: {}", e.getMessage());
future.completeExceptionally(e);
}
return future;
}
public void subscribe() {
consumer.subscribe(Collections.singletonList(topic));
LOG.info("Subscribed to Kafka topic: {}", topic);
}
public ConsumerRecords<String, CloudEvent> pollEvents(Duration timeout) {
return consumer.poll(timeout);
}
public void commitSync() {
consumer.commitSync();
}
public void close() {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
LOG.info("Kafka transport closed");
}
}
Spring Boot Integration
1. Spring Boot Configuration
package com.example.cloudevents.config;
import com.example.cloudevents.core.CloudEventFactory;
import com.example.cloudevents.core.CloudEventValidator;
import com.example.cloudevents.transport.http.CloudEventHttpClient;
import com.example.cloudevents.transport.kafka.CloudEventKafkaTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.vertx.core.Vertx;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration
@EnableKafka
public class CloudEventsConfig {
@Value("${app.events.source:https://example.com/app}")
private String eventSource;
@Value("${app.kafka.bootstrap-servers:localhost:9092}")
private String kafkaBootstrapServers;
@Value("${app.kafka.topic:cloudevents}")
private String kafkaTopic;
@Value("${app.http.event-endpoint:http://localhost:8080/events}")
private String httpEventEndpoint;
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
return mapper;
}
@Bean
public CloudEventFactory cloudEventFactory(ObjectMapper objectMapper) {
return new CloudEventFactory(eventSource, objectMapper);
}
@Bean
public CloudEventValidator cloudEventValidator() {
return new CloudEventValidator();
}
@Bean
public Vertx vertx() {
return Vertx.vertx();
}
@Bean
public CloudEventHttpClient cloudEventHttpClient(Vertx vertx) {
return new CloudEventHttpClient(vertx, httpEventEndpoint);
}
@Bean
public CloudEventKafkaTransport cloudEventKafkaTransport() {
return new CloudEventKafkaTransport(kafkaBootstrapServers, kafkaTopic);
}
}
2. Application Properties
# application.yml
app:
events:
source: "https://user-service.example.com"
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
topic: "user-events"
consumer-group: "user-service-consumer"
http:
event-endpoint: ${EVENT_ENDPOINT:http://event-broker:8080/events}
spring:
kafka:
bootstrap-servers: ${app.kafka.bootstrap-servers}
properties:
security.protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
consumer:
group-id: ${app.kafka.consumer-group}
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.cloudevents.kafka.CloudEventDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.cloudevents.kafka.CloudEventSerializer
server:
port: 8080
logging:
level:
com.example.cloudevents: DEBUG
3. Spring REST Controller for CloudEvents
package com.example.cloudevents.controller;
import com.example.cloudevents.core.CloudEventFactory;
import com.example.cloudevents.core.CloudEventValidator;
import com.example.cloudevents.model.*;
import com.example.cloudevents.service.UserService;
import com.example.cloudevents.transport.http.CloudEventHttpClient;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/api/events")
public class CloudEventController {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventController.class);
private final CloudEventFactory eventFactory;
private final CloudEventValidator eventValidator;
private final CloudEventHttpClient httpClient;
private final UserService userService;
public CloudEventController(CloudEventFactory eventFactory,
CloudEventValidator eventValidator,
CloudEventHttpClient httpClient,
UserService userService) {
this.eventFactory = eventFactory;
this.eventValidator = eventValidator;
this.httpClient = httpClient;
this.userService = userService;
}
@PostMapping("/receive")
public ResponseEntity<EventReceipt> receiveEvent(@RequestBody CloudEvent cloudEvent) {
LOG.info("Received CloudEvent: {} from {}", cloudEvent.getType(), cloudEvent.getSource());
// Validate the event
ValidationResult validation = eventValidator.validate(cloudEvent);
if (!validation.isValid()) {
LOG.warn("Invalid CloudEvent received: {}", validation.getMessage());
return ResponseEntity.badRequest()
.body(EventReceipt.failure("Invalid event: " + validation.getMessage()));
}
try {
// Process based on event type
String eventType = cloudEvent.getType();
switch (eventType) {
case "com.example.user.created":
UserCreatedData userCreated = eventFactory.extractData(cloudEvent, UserCreatedData.class);
userService.handleUserCreated(userCreated);
break;
case "com.example.user.updated":
UserUpdatedData userUpdated = eventFactory.extractData(cloudEvent, UserUpdatedData.class);
userService.handleUserUpdated(userUpdated);
break;
case "com.example.order.placed":
OrderPlacedData orderPlaced = eventFactory.extractData(cloudEvent, OrderPlacedData.class);
userService.handleOrderPlaced(orderPlaced);
break;
default:
LOG.warn("Unhandled event type: {}", eventType);
return ResponseEntity.status(HttpStatus.NOT_IMPLEMENTED)
.body(EventReceipt.failure("Unhandled event type: " + eventType));
}
LOG.info("Successfully processed CloudEvent: {}", cloudEvent.getId());
return ResponseEntity.ok(EventReceipt.success(cloudEvent.getId()));
} catch (Exception e) {
LOG.error("Failed to process CloudEvent {}: {}", cloudEvent.getId(), e.getMessage(), e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(EventReceipt.failure("Processing failed: " + e.getMessage()));
}
}
@PostMapping("/users/{userId}/created")
public CompletableFuture<ResponseEntity<EventReceipt>> publishUserCreated(
@PathVariable String userId,
@RequestBody UserCreatedData userData) {
LOG.info("Publishing user created event for: {}", userId);
CloudEvent event = eventFactory.createUserCreatedEvent(userData);
return httpClient.sendEvent(event, "/api/events/receive")
.thenApply(response -> {
if (response.isSuccess()) {
return ResponseEntity.ok(EventReceipt.success(event.getId()));
} else {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(EventReceipt.failure("Failed to publish event: " + response.getBody()));
}
})
.exceptionally(throwable -> {
LOG.error("Failed to publish user created event: {}", throwable.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(EventReceipt.failure("Publishing failed: " + throwable.getMessage()));
});
}
@GetMapping("/health")
public ResponseEntity<String> health() {
return ResponseEntity.ok("CloudEvents service is healthy");
}
}
class EventReceipt {
private final boolean success;
private final String eventId;
private final String message;
private final long timestamp;
private EventReceipt(boolean success, String eventId, String message) {
this.success = success;
this.eventId = eventId;
this.message = message;
this.timestamp = System.currentTimeMillis();
}
public static EventReceipt success(String eventId) {
return new EventReceipt(true, eventId, "Event processed successfully");
}
public static EventReceipt failure(String message) {
return new EventReceipt(false, null, message);
}
// Getters
public boolean isSuccess() { return success; }
public String getEventId() { return eventId; }
public String getMessage() { return message; }
public long getTimestamp() { return timestamp; }
}
4. Kafka Event Consumer
package com.example.cloudevents.consumer;
import com.example.cloudevents.core.CloudEventFactory;
import com.example.cloudevents.core.CloudEventValidator;
import com.example.cloudevents.model.*;
import com.example.cloudevents.service.UserService;
import io.cloudevents.CloudEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class CloudEventKafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventKafkaConsumer.class);
private final CloudEventFactory eventFactory;
private final CloudEventValidator eventValidator;
private final UserService userService;
public CloudEventKafkaConsumer(CloudEventFactory eventFactory,
CloudEventValidator eventValidator,
UserService userService) {
this.eventFactory = eventFactory;
this.eventValidator = eventValidator;
this.userService = userService;
}
@KafkaListener(topics = "${app.kafka.topic:cloudevents}")
public void consumeCloudEvent(ConsumerRecord<String, CloudEvent> record, Acknowledgment ack) {
CloudEvent cloudEvent = record.value();
LOG.info("Received CloudEvent from Kafka: {} [key: {}, partition: {}, offset: {}]",
cloudEvent.getType(), record.key(), record.partition(), record.offset());
try {
// Validate event
if (!eventValidator.isValid(cloudEvent)) {
LOG.warn("Invalid CloudEvent received from Kafka, skipping: {}", cloudEvent.getId());
ack.acknowledge();
return;
}
// Process event based on type
processCloudEvent(cloudEvent);
// Acknowledge message
ack.acknowledge();
LOG.debug("Successfully processed CloudEvent: {}", cloudEvent.getId());
} catch (Exception e) {
LOG.error("Failed to process CloudEvent from Kafka {}: {}",
cloudEvent.getId(), e.getMessage(), e);
// In production, you might want to implement retry or dead-letter queue logic
}
}
private void processCloudEvent(CloudEvent cloudEvent) {
String eventType = cloudEvent.getType();
switch (eventType) {
case "com.example.user.created":
UserCreatedData userCreated = eventFactory.extractData(cloudEvent, UserCreatedData.class);
userService.handleUserCreated(userCreated);
break;
case "com.example.user.updated":
UserUpdatedData userUpdated = eventFactory.extractData(cloudEvent, UserUpdatedData.class);
userService.handleUserUpdated(userUpdated);
break;
case "com.example.order.placed":
OrderPlacedData orderPlaced = eventFactory.extractData(cloudEvent, OrderPlacedData.class);
userService.handleOrderPlaced(orderPlaced);
break;
case "com.example.payment.processed":
PaymentProcessedData paymentProcessed = eventFactory.extractData(cloudEvent, PaymentProcessedData.class);
userService.handlePaymentProcessed(paymentProcessed);
break;
default:
LOG.warn("Unhandled CloudEvent type: {}", eventType);
}
}
}
5. Service Layer for Event Processing
package com.example.cloudevents.service;
import com.example.cloudevents.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class UserService {
private static final Logger LOG = LoggerFactory.getLogger(UserService.class);
public void handleUserCreated(UserCreatedData userData) {
LOG.info("Processing user creation: {} - {}", userData.getUserId(), userData.getEmail());
// Business logic for user creation
// This could involve:
// - Creating user in database
// - Sending welcome email
// - Setting up user preferences
// - Publishing integration events
try {
// Simulate processing
Thread.sleep(100);
LOG.info("Successfully processed user creation for: {}", userData.getUserId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("User creation processing interrupted for: {}", userData.getUserId());
}
}
public void handleUserUpdated(UserUpdatedData userData) {
LOG.info("Processing user update: {}", userData.getUserId());
// Business logic for user updates
// - Update user in database
// - Sync with external systems
// - Update search indexes
LOG.info("Successfully processed user update for: {}", userData.getUserId());
}
public void handleOrderPlaced(OrderPlacedData orderData) {
LOG.info("Processing order placement: {} for user: {}",
orderData.getOrderId(), orderData.getCustomerId());
// Business logic for order placement
// - Update user order history
// - Calculate loyalty points
// - Send order confirmation
LOG.info("Successfully processed order placement: {}", orderData.getOrderId());
}
public void handlePaymentProcessed(PaymentProcessedData paymentData) {
LOG.info("Processing payment: {} for order: {}",
paymentData.getPaymentId(), paymentData.getOrderId());
// Business logic for payment processing
// - Update order status
// - Trigger shipping
// - Update user account balance
if (paymentData.getStatus() == PaymentStatus.SUCCEEDED) {
LOG.info("Payment succeeded: {}", paymentData.getPaymentId());
} else {
LOG.warn("Payment failed: {}", paymentData.getPaymentId());
}
}
}
Advanced Features
1. Event Correlation and Tracing
package com.example.cloudevents.correlation;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class EventCorrelationService {
private static final String CORRELATION_ID_HEADER = "ce-correlationid";
private static final String TRACE_ID_HEADER = "traceparent";
public CloudEvent addCorrelationContext(CloudEvent cloudEvent) {
CloudEventBuilder builder = CloudEventBuilder.from(cloudEvent);
// Add correlation ID if not present
if (cloudEvent.getExtension(CORRELATION_ID_HEADER) == null) {
String correlationId = getOrGenerateCorrelationId();
builder.withExtension(CORRELATION_ID_HEADER, correlationId);
}
// Add trace context if available
String traceId = MDC.get("traceId");
if (traceId != null && cloudEvent.getExtension(TRACE_ID_HEADER) == null) {
builder.withExtension(TRACE_ID_HEADER, traceId);
}
return builder.build();
}
public void setupCorrelationContext(CloudEvent cloudEvent) {
// Extract correlation ID from event
String correlationId = (String) cloudEvent.getExtension(CORRELATION_ID_HEADER);
if (correlationId != null) {
MDC.put("correlationId", correlationId);
}
// Extract trace ID from event
String traceId = (String) cloudEvent.getExtension(TRACE_ID_HEADER);
if (traceId != null) {
MDC.put("traceId", traceId);
}
}
public void clearCorrelationContext() {
MDC.remove("correlationId");
MDC.remove("traceId");
}
private String getOrGenerateCorrelationId() {
String correlationId = MDC.get("correlationId");
if (correlationId == null) {
correlationId = UUID.randomUUID().toString();
MDC.put("correlationId", correlationId);
}
return correlationId;
}
}
2. Event Schema Validation
package com.example.cloudevents.validation;
import com.example.cloudevents.model.ValidationResult;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class EventSchemaValidator {
private static final Logger LOG = LoggerFactory.getLogger(EventSchemaValidator.class);
private final ObjectMapper objectMapper;
private final ConcurrentHashMap<String, JsonSchema> schemaCache;
private final JsonSchemaFactory schemaFactory;
public EventSchemaValidator(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.schemaCache = new ConcurrentHashMap<>();
this.schemaFactory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7);
}
public ValidationResult validateAgainstSchema(String eventType, Object eventData) {
try {
JsonSchema schema = getSchemaForEventType(eventType);
if (schema == null) {
LOG.warn("No schema found for event type: {}", eventType);
return ValidationResult.valid(); // No schema means no validation
}
JsonNode dataNode = objectMapper.valueToTree(eventData);
Set<ValidationMessage> validationMessages = schema.validate(dataNode);
if (validationMessages.isEmpty()) {
return ValidationResult.valid();
} else {
return ValidationResult.invalid(
validationMessages.stream()
.map(ValidationMessage::getMessage)
.toList()
);
}
} catch (Exception e) {
LOG.error("Schema validation failed for event type {}: {}", eventType, e.getMessage());
return ValidationResult.invalid("Schema validation error: " + e.getMessage());
}
}
private JsonSchema getSchemaForEventType(String eventType) {
return schemaCache.computeIfAbsent(eventType, this::loadSchema);
}
private JsonSchema loadSchema(String eventType) {
try {
String schemaPath = String.format("schemas/%s.schema.json",
eventType.replace('.', '/'));
ClassPathResource resource = new ClassPathResource(schemaPath);
if (!resource.exists()) {
LOG.debug("No schema file found at: {}", schemaPath);
return null;
}
try (InputStream inputStream = resource.getInputStream()) {
return schemaFactory.getSchema(inputStream);
}
} catch (Exception e) {
LOG.error("Failed to load schema for event type {}: {}", eventType, e.getMessage());
return null;
}
}
}
3. Event Retry and Dead Letter Queue
package com.example.cloudevents.retry;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.time.OffsetDateTime;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class EventRetryHandler {
private static final Logger LOG = LoggerFactory.getLogger(EventRetryHandler.class);
private final KafkaTemplate<String, CloudEvent> kafkaTemplate;
private final ConcurrentHashMap<String, AtomicInteger> retryCounts;
@Value("${app.kafka.dlq-topic:cloudevents-dlq}")
private String dlqTopic;
@Value("${app.kafka.retry-topic:cloudevents-retry}")
private String retryTopic;
@Value("${app.events.max-retries:3}")
private int maxRetries;
public EventRetryHandler(KafkaTemplate<String, CloudEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.retryCounts = new ConcurrentHashMap<>();
}
public boolean shouldRetry(CloudEvent cloudEvent, Exception error) {
String eventId = cloudEvent.getId();
int retryCount = retryCounts.computeIfAbsent(eventId, k -> new AtomicInteger(0)).get();
if (retryCount >= maxRetries) {
LOG.warn("Event {} reached maximum retry count ({}), sending to DLQ", eventId, maxRetries);
sendToDlq(cloudEvent, error);
retryCounts.remove(eventId);
return false;
}
return true;
}
public void scheduleRetry(CloudEvent cloudEvent) {
String eventId = cloudEvent.getId();
int retryCount = retryCounts.get(eventId).incrementAndGet();
LOG.info("Scheduling retry {} for event: {}", retryCount, eventId);
// Add retry delay based on retry count (exponential backoff)
long delayMs = calculateRetryDelay(retryCount);
// In production, you might use a delayed message queue or scheduler
// For simplicity, we're sending to a retry topic
kafkaTemplate.send(retryTopic, eventId, cloudEvent);
}
public void sendToDlq(CloudEvent cloudEvent, Exception error) {
// Add error information to the event
CloudEvent dlqEvent = io.cloudevents.core.builder.CloudEventBuilder.from(cloudEvent)
.withExtension("dlq-reason", error.getMessage())
.withExtension("dlq-timestamp", OffsetDateTime.now().toString())
.build();
kafkaTemplate.send(dlqTopic, cloudEvent.getId(), dlqEvent);
LOG.info("Event {} sent to DLQ: {}", cloudEvent.getId(), error.getMessage());
}
private long calculateRetryDelay(int retryCount) {
// Exponential backoff: 1s, 2s, 4s, 8s, etc.
return (long) Math.pow(2, retryCount - 1) * 1000;
}
public void clearRetryCount(String eventId) {
retryCounts.remove(eventId);
}
}
Testing
1. Unit Tests
package com.example.cloudevents.core;
import com.example.cloudevents.model.UserCreatedData;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class CloudEventFactoryTest {
private CloudEventFactory eventFactory;
private ObjectMapper objectMapper;
@BeforeEach
void setUp() {
objectMapper = new ObjectMapper();
eventFactory = new CloudEventFactory("https://test.example.com", objectMapper);
}
@Test
void testCreateEventWithData() {
// Given
UserCreatedData userData = new UserCreatedData("user123", "[email protected]", "John", "Doe");
// When
CloudEvent event = eventFactory.createUserCreatedEvent(userData);
// Then
assertNotNull(event);
assertEquals("com.example.user.created", event.getType());
assertEquals("https://test.example.com", event.getSource().toString());
assertNotNull(event.getId());
assertNotNull(event.getTime());
assertEquals("user123", event.getSubject());
}
@Test
void testExtractDataFromEvent() {
// Given
UserCreatedData originalData = new UserCreatedData("user123", "[email protected]", "John", "Doe");
CloudEvent event = eventFactory.createUserCreatedEvent(originalData);
// When
UserCreatedData extractedData = eventFactory.extractData(event, UserCreatedData.class);
// Then
assertNotNull(extractedData);
assertEquals(originalData.getUserId(), extractedData.getUserId());
assertEquals(originalData.getEmail(), extractedData.getEmail());
assertEquals(originalData.getFirstName(), extractedData.getFirstName());
assertEquals(originalData.getLastName(), extractedData.getLastName());
}
@Test
void testEventValidation() {
// Given
CloudEventValidator validator = new CloudEventValidator();
UserCreatedData userData = new UserCreatedData("user123", "[email protected]", "John", "Doe");
CloudEvent event = eventFactory.createUserCreatedEvent(userData);
// When
ValidationResult result = validator.validate(event);
// Then
assertTrue(result.isValid());
assertTrue(result.getErrors().isEmpty());
}
}
2. Integration Tests
package com.example.cloudevents.controller;
import com.example.cloudevents.core.CloudEventFactory;
import com.example.cloudevents.model.UserCreatedData;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import java.net.URI;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
@SpringBootTest
@AutoConfigureMockMvc
class CloudEventControllerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private CloudEventFactory eventFactory;
@Test
void testReceiveValidCloudEvent() throws Exception {
// Given
UserCreatedData userData = new UserCreatedData("test-user", "[email protected]", "Test", "User");
CloudEvent cloudEvent = eventFactory.createUserCreatedEvent(userData);
// When & Then
mockMvc.perform(post("/api/events/receive")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(cloudEvent)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.success").value(true))
.andExpect(jsonPath("$.eventId").exists());
}
@Test
void testReceiveInvalidCloudEvent() throws Exception {
// Given - Create an invalid CloudEvent (missing required fields)
CloudEvent invalidEvent = CloudEventBuilder.v1()
.withId("test-id")
.withSource(URI.create("https://test.example.com"))
// Missing type - should cause validation to fail
.build();
// When & Then
mockMvc.perform(post("/api/events/receive")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(invalidEvent)))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.success").value(false));
}
}
Best Practices
1. Configuration for Production
# application-production.yml
app:
events:
source: "https://user-service.production.example.com"
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
topic: "production-cloudevents"
consumer-group: "user-service-production"
dlq-topic: "production-cloudevents-dlq"
retry-topic: "production-cloudevents-retry"
http:
event-endpoint: "https://event-broker.production.example.com/events"
events:
max-retries: 5
retry-backoff-multiplier: 2
spring:
kafka:
bootstrap-servers: ${app.kafka.bootstrap-servers}
properties:
security.protocol: SSL
ssl.truststore.location: ${KAFKA_TRUSTSTORE_LOCATION}
ssl.truststore.password: ${KAFKA_TRUSTSTORE_PASSWORD}
ssl.keystore.location: ${KAFKA_KEYSTORE_LOCATION}
ssl.keystore.password: ${KAFKA_KEYSTORE_PASSWORD}
consumer:
group-id: ${app.kafka.consumer-group}
auto-offset-reset: latest
enable-auto-commit: false
producer:
acks: all
retries: 3
batch-size: 16384
linger-ms: 1
buffer-memory: 33554432
management:
endpoints:
web:
exposure:
include: health,metrics,info
endpoint:
health:
show-details: always
2. Monitoring and Metrics
package com.example.cloudevents.metrics;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class EventMetrics {
private final Counter eventsReceivedCounter;
private final Counter eventsProcessedCounter;
private final Counter eventsFailedCounter;
private final Timer eventProcessingTimer;
public EventMetrics(MeterRegistry meterRegistry) {
this.eventsReceivedCounter = Counter.builder("cloudevents.received")
.description("Number of CloudEvents received")
.tag("transport", "http") // or "kafka"
.register(meterRegistry);
this.eventsProcessedCounter = Counter.builder("cloudevents.processed")
.description("Number of CloudEvents successfully processed")
.register(meterRegistry);
this.eventsFailedCounter = Counter.builder("cloudevents.failed")
.description("Number of CloudEvents that failed processing")
.register(meterRegistry);
this.eventProcessingTimer = Timer.builder("cloudevents.processing.time")
.description("Time taken to process CloudEvents")
.register(meterRegistry);
}
public void recordEventReceived(String transport) {
eventsReceivedCounter.increment();
}
public void recordEventProcessed() {
eventsProcessedCounter.increment();
}
public void recordEventFailed() {
eventsFailedCounter.increment();
}
public void recordProcessingTime(long duration, TimeUnit unit) {
eventProcessingTimer.record(duration, unit);
}
}
Conclusion
Implementing CloudEvents SDK in Java provides:
- Standardized event format across different systems and platforms
- Protocol interoperability (HTTP, Kafka, AMQP, etc.)
- Type-safe event handling with Java models
- Comprehensive validation and error handling
- Production-ready features like retry mechanisms and dead-letter queues
Key benefits demonstrated:
- Event-driven architecture with CloudEvents specification
- Multiple transport support (HTTP, Kafka)
- Spring Boot integration for easy development
- Schema validation for data consistency
- Correlation and tracing for distributed systems
- Error handling with retry and DLQ patterns
- Monitoring and metrics for observability
This implementation enables building robust, scalable event-driven systems that can communicate seamlessly across different platforms and technologies while maintaining strong typing and comprehensive error handling.