CloudEvents SDK in Java

Introduction

CloudEvents is a specification for describing event data in a common way, providing interoperability across services, platforms, and systems. The CloudEvents SDK for Java helps developers create, send, receive, and process CloudEvents in Java applications.

Dependencies and Setup

1. Maven Dependencies

<properties>
<cloudevents.version>3.2.0</cloudevents.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- CloudEvents Core SDK -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- CloudEvents HTTP Binding -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- Jackson JSON Binding -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- Spring Boot Starter Web (for REST endpoints) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter WebFlux (for reactive support) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Kafka Integration -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Micrometer Tracing -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
</dependencies>

2. Gradle Configuration

dependencies {
implementation("io.cloudevents:cloudevents-core:3.2.0")
implementation("io.cloudevents:cloudevents-http-basic:3.2.0")
implementation("io.cloudevents:cloudevents-json-jackson:3.2.0")
implementation("io.cloudevents:cloudevents-kafka:3.2.0")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.kafka:spring-kafka")
implementation("io.micrometer:micrometer-tracing-bridge-brave")
}

Core CloudEvents Concepts

1. Event Models

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserCreatedEvent {
private String userId;
private String email;
private String firstName;
private String lastName;
private Instant createdAt;
private Map<String, String> metadata;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderPlacedEvent {
private String orderId;
private String customerId;
private BigDecimal amount;
private String currency;
private List<OrderItem> items;
private Instant orderDate;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class OrderItem {
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PaymentProcessedEvent {
private String paymentId;
private String orderId;
private String status;
private BigDecimal amount;
private String transactionId;
private Instant processedAt;
}

CloudEvents Service

1. Core CloudEvents Service

@Service
@Slf4j
public class CloudEventService {
private final ObjectMapper objectMapper;
private final CloudEventBuilderFactory builderFactory;
public CloudEventService(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.builderFactory = CloudEventBuilderFactory.getInstance();
}
public CloudEvent createCloudEvent(String eventType, Object data) {
return createCloudEvent(eventType, data, null, null);
}
public CloudEvent createCloudEvent(String eventType, Object data, 
String source, String subject) {
try {
CloudEventBuilder builder = builderFactory.newBuilder()
.withId(UUID.randomUUID().toString())
.withType(eventType)
.withSource(URI.create(source != null ? source : "urn:example-app:service"))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json");
if (subject != null) {
builder.withSubject(subject);
}
// Add extension attributes
builder.withExtension("tenantid", "acme-corp")
.withExtension("environment", "production");
// Serialize data to JSON bytes
byte[] dataBytes = objectMapper.writeValueAsBytes(data);
builder.withData(dataBytes);
return builder.build();
} catch (Exception e) {
throw new CloudEventException("Failed to create CloudEvent", e);
}
}
public <T> T extractData(CloudEvent cloudEvent, Class<T> dataType) {
try {
if (cloudEvent.getData() == null) {
return null;
}
byte[] dataBytes = cloudEvent.getData().toBytes();
return objectMapper.readValue(dataBytes, dataType);
} catch (Exception e) {
throw new CloudEventException("Failed to extract data from CloudEvent", e);
}
}
public Map<String, Object> extractExtensions(CloudEvent cloudEvent) {
Map<String, Object> extensions = new HashMap<>();
cloudEvent.getExtensionNames().forEach(extensionName -> {
Object value = cloudEvent.getAttribute(extensionName);
if (value != null) {
extensions.put(extensionName, value);
}
});
return extensions;
}
public void validateCloudEvent(CloudEvent cloudEvent) {
List<String> errors = new ArrayList<>();
if (cloudEvent.getId() == null) {
errors.add("CloudEvent ID is required");
}
if (cloudEvent.getSource() == null) {
errors.add("CloudEvent source is required");
}
if (cloudEvent.getType() == null) {
errors.add("CloudEvent type is required");
}
if (cloudEvent.getSpecVersion() == null) {
errors.add("CloudEvent spec version is required");
}
if (!errors.isEmpty()) {
throw new InvalidCloudEventException("CloudEvent validation failed: " + errors);
}
}
public CloudEvent enrichWithTraceContext(CloudEvent cloudEvent, String traceId, String spanId) {
CloudEventBuilder builder = builderFactory.newBuilder(cloudEvent);
if (traceId != null) {
builder.withExtension("traceparent", traceId);
}
if (spanId != null) {
builder.withExtension("tracestate", spanId);
}
return builder.build();
}
}

2. CloudEvent Builder Factory

@Component
public class CloudEventBuilderFactory {
private static final String DEFAULT_SOURCE = "urn:example-app:unknown";
public CloudEventBuilder newBuilder() {
return CloudEventBuilder.v1()
.withSource(URI.create(DEFAULT_SOURCE))
.withTime(OffsetDateTime.now());
}
public CloudEventBuilder newBuilder(CloudEvent existingEvent) {
return CloudEventBuilder.v1(existingEvent);
}
public CloudEventBuilder forUserEvent(String eventType, UserCreatedEvent userEvent) {
return newBuilder()
.withType(eventType)
.withSubject(userEvent.getUserId())
.withExtension("eventcategory", "user")
.withExtension("usertype", "customer");
}
public CloudEventBuilder forOrderEvent(String eventType, OrderPlacedEvent orderEvent) {
return newBuilder()
.withType(eventType)
.withSubject(orderEvent.getOrderId())
.withExtension("eventcategory", "order")
.withExtension("customertier", "premium"); // Example extension
}
public CloudEventBuilder forSystemEvent(String eventType, String component) {
return newBuilder()
.withType(eventType)
.withSource(URI.create("urn:example-app:" + component))
.withExtension("eventcategory", "system")
.withExtension("severity", "info");
}
}

HTTP Integration

1. HTTP CloudEvent Sender

@Service
@Slf4j
public class HttpCloudEventSender {
private final WebClient webClient;
private final CloudEventService cloudEventService;
private final ObjectMapper objectMapper;
public HttpCloudEventSender(WebClient.Builder webClientBuilder,
CloudEventService cloudEventService,
ObjectMapper objectMapper) {
this.webClient = webClientBuilder.build();
this.cloudEventService = cloudEventService;
this.objectMapper = objectMapper;
}
public Mono<Void> sendCloudEvent(String targetUrl, CloudEvent cloudEvent) {
return sendCloudEvent(targetUrl, cloudEvent, Map.of());
}
public Mono<Void> sendCloudEvent(String targetUrl, CloudEvent cloudEvent, 
Map<String, String> headers) {
try {
HttpMessageWriter<CloudEvent> writer = 
CloudEventsMessageWriter.create();
return webClient.post()
.uri(targetUrl)
.headers(httpHeaders -> {
headers.forEach(httpHeaders::add);
httpHeaders.add("Ce-Id", cloudEvent.getId());
httpHeaders.add("Ce-Specversion", cloudEvent.getSpecVersion().toString());
httpHeaders.add("Ce-Type", cloudEvent.getType());
httpHeaders.add("Ce-Source", cloudEvent.getSource().toString());
httpHeaders.add("Ce-Time", cloudEvent.getTime().toString());
// Add extensions as headers
cloudEvent.getExtensionNames().forEach(extensionName -> {
Object value = cloudEvent.getAttribute(extensionName);
if (value != null) {
httpHeaders.add("Ce-" + extensionName, value.toString());
}
});
})
.body(BodyInserters.fromValue(cloudEvent))
.retrieve()
.bodyToMono(Void.class)
.doOnSuccess(unused -> log.info("CloudEvent sent successfully: {}", cloudEvent.getId()))
.doOnError(error -> log.error("Failed to send CloudEvent: {}", cloudEvent.getId(), error));
} catch (Exception e) {
log.error("Error sending CloudEvent to: {}", targetUrl, e);
return Mono.error(e);
}
}
public Mono<String> sendCloudEventAndGetResponse(String targetUrl, 
CloudEvent cloudEvent) {
return webClient.post()
.uri(targetUrl)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(cloudEvent))
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(30))
.onErrorResume(throwable -> {
log.error("Failed to send CloudEvent and get response", throwable);
return Mono.just("Error: " + throwable.getMessage());
});
}
public Flux<CloudEvent> sendBatchCloudEvents(String targetUrl, 
List<CloudEvent> cloudEvents) {
return Flux.fromIterable(cloudEvents)
.flatMap(event -> sendCloudEvent(targetUrl, event)
.thenReturn(event)
)
.doOnNext(event -> log.debug("Sent batch event: {}", event.getId()))
.doOnComplete(() -> log.info("Completed sending batch of {} events", cloudEvents.size()));
}
}

2. HTTP CloudEvent Receiver

@RestController
@RequestMapping("/api/events")
@Slf4j
public class CloudEventReceiverController {
private final CloudEventService cloudEventService;
private final EventProcessingService eventProcessingService;
public CloudEventReceiverController(CloudEventService cloudEventService,
EventProcessingService eventProcessingService) {
this.cloudEventService = cloudEventService;
this.eventProcessingService = eventProcessingService;
}
@PostMapping("/receive")
public ResponseEntity<String> receiveCloudEvent(
@RequestBody byte[] body,
@RequestHeader Map<String, String> headers) {
try {
// Parse CloudEvent from HTTP request
CloudEvent cloudEvent = parseCloudEventFromHttp(body, headers);
// Validate the CloudEvent
cloudEventService.validateCloudEvent(cloudEvent);
// Process the event
eventProcessingService.processEvent(cloudEvent);
log.info("Successfully processed CloudEvent: {}", cloudEvent.getId());
return ResponseEntity.accepted().body("Event processed successfully");
} catch (InvalidCloudEventException e) {
log.warn("Invalid CloudEvent received", e);
return ResponseEntity.badRequest().body("Invalid CloudEvent: " + e.getMessage());
} catch (Exception e) {
log.error("Error processing CloudEvent", e);
return ResponseEntity.status(500).body("Error processing event");
}
}
@PostMapping("/receive-structured")
public ResponseEntity<String> receiveStructuredCloudEvent(
@RequestBody String body,
@RequestHeader("Content-Type") String contentType) {
try {
CloudEvent cloudEvent = CloudEventUtils.fromStructuredMessage(
body, 
contentType, 
ObjectMapperMessageFactory.create(objectMapper)
);
cloudEventService.validateCloudEvent(cloudEvent);
eventProcessingService.processEvent(cloudEvent);
return ResponseEntity.accepted().body("Structured event processed");
} catch (Exception e) {
log.error("Error processing structured CloudEvent", e);
return ResponseEntity.badRequest().body("Error: " + e.getMessage());
}
}
private CloudEvent parseCloudEventFromHttp(byte[] body, Map<String, String> headers) {
try {
return CloudEventsMessageReader.create()
.readBinary(ByteBuffer.wrap(body), headers);
} catch (Exception e) {
throw new InvalidCloudEventException("Failed to parse CloudEvent from HTTP", e);
}
}
@PostMapping("/batch")
public ResponseEntity<String> receiveBatchCloudEvents(
@RequestBody List<CloudEvent> cloudEvents) {
try {
int processedCount = 0;
List<String> errors = new ArrayList<>();
for (CloudEvent cloudEvent : cloudEvents) {
try {
cloudEventService.validateCloudEvent(cloudEvent);
eventProcessingService.processEvent(cloudEvent);
processedCount++;
} catch (Exception e) {
errors.add("Event " + cloudEvent.getId() + ": " + e.getMessage());
log.warn("Failed to process event in batch: {}", cloudEvent.getId(), e);
}
}
String response = String.format(
"Processed %d/%d events. Errors: %s", 
processedCount, cloudEvents.size(), errors);
return ResponseEntity.accepted().body(response);
} catch (Exception e) {
log.error("Error processing batch CloudEvents", e);
return ResponseEntity.status(500).body("Batch processing failed");
}
}
}

Kafka Integration

1. Kafka CloudEvent Producer

@Service
@Slf4j
public class KafkaCloudEventProducer {
private final KafkaTemplate<String, byte[]> kafkaTemplate;
private final CloudEventService cloudEventService;
private final ObjectMapper objectMapper;
public KafkaCloudEventProducer(KafkaTemplate<String, byte[]> kafkaTemplate,
CloudEventService cloudEventService,
ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.cloudEventService = cloudEventService;
this.objectMapper = objectMapper;
}
public void sendCloudEvent(String topic, CloudEvent cloudEvent) {
sendCloudEvent(topic, null, cloudEvent);
}
public void sendCloudEvent(String topic, String key, CloudEvent cloudEvent) {
try {
// Convert CloudEvent to Kafka message
byte[] value = CloudEventKafkaUtils.toKafkaMessage(cloudEvent);
// Create Kafka headers
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, value);
// Add CloudEvent attributes as headers
record.headers()
.add("ce_id", cloudEvent.getId().getBytes(StandardCharsets.UTF_8))
.add("ce_specversion", cloudEvent.getSpecVersion().toString().getBytes(StandardCharsets.UTF_8))
.add("ce_type", cloudEvent.getType().getBytes(StandardCharsets.UTF_8))
.add("ce_source", cloudEvent.getSource().toString().getBytes(StandardCharsets.UTF_8));
if (cloudEvent.getSubject() != null) {
record.headers().add("ce_subject", cloudEvent.getSubject().getBytes(StandardCharsets.UTF_8));
}
if (cloudEvent.getTime() != null) {
record.headers().add("ce_time", cloudEvent.getTime().toString().getBytes(StandardCharsets.UTF_8));
}
// Send the message
kafkaTemplate.send(record)
.addCallback(
result -> log.info("CloudEvent sent to Kafka: {} to partition {}", 
cloudEvent.getId(), result.getRecordMetadata().partition()),
failure -> log.error("Failed to send CloudEvent to Kafka: {}", cloudEvent.getId(), failure)
);
} catch (Exception e) {
log.error("Error sending CloudEvent to Kafka topic: {}", topic, e);
throw new CloudEventException("Failed to send CloudEvent to Kafka", e);
}
}
public void sendUserEvent(String topic, UserCreatedEvent userEvent) {
CloudEvent cloudEvent = cloudEventService.createCloudEvent(
"com.example.user.created", 
userEvent,
"urn:example-app:user-service",
userEvent.getUserId()
);
sendCloudEvent(topic, userEvent.getUserId(), cloudEvent);
}
public void sendOrderEvent(String topic, OrderPlacedEvent orderEvent) {
CloudEvent cloudEvent = cloudEventService.createCloudEvent(
"com.example.order.placed",
orderEvent,
"urn:example-app:order-service", 
orderEvent.getOrderId()
);
sendCloudEvent(topic, orderEvent.getOrderId(), cloudEvent);
}
@Async
public CompletableFuture<Void> sendCloudEventAsync(String topic, CloudEvent cloudEvent) {
return CompletableFuture.runAsync(() -> sendCloudEvent(topic, cloudEvent));
}
}

2. Kafka CloudEvent Consumer

@Service
@Slf4j
public class KafkaCloudEventConsumer {
private final CloudEventService cloudEventService;
private final EventProcessingService eventProcessingService;
public KafkaCloudEventConsumer(CloudEventService cloudEventService,
EventProcessingService eventProcessingService) {
this.cloudEventService = cloudEventService;
this.eventProcessingService = eventProcessingService;
}
@KafkaListener(topics = "${app.kafka.topics.user-events}")
public void consumeUserEvents(ConsumerRecord<String, byte[]> record) {
try {
CloudEvent cloudEvent = CloudEventKafkaUtils.fromKafkaMessage(record);
cloudEventService.validateCloudEvent(cloudEvent);
UserCreatedEvent userEvent = cloudEventService.extractData(cloudEvent, UserCreatedEvent.class);
eventProcessingService.processUserEvent(userEvent, cloudEvent);
log.info("Processed user event: {} for user: {}", 
cloudEvent.getType(), userEvent.getUserId());
} catch (Exception e) {
log.error("Failed to process user event from Kafka", e);
// Implement dead letter queue logic here
}
}
@KafkaListener(topics = "${app.kafka.topics.order-events}")
public void consumeOrderEvents(ConsumerRecord<String, byte[]> record) {
try {
CloudEvent cloudEvent = CloudEventKafkaUtils.fromKafkaMessage(record);
cloudEventService.validateCloudEvent(cloudEvent);
OrderPlacedEvent orderEvent = cloudEventService.extractData(cloudEvent, OrderPlacedEvent.class);
eventProcessingService.processOrderEvent(orderEvent, cloudEvent);
log.info("Processed order event: {} for order: {}", 
cloudEvent.getType(), orderEvent.getOrderId());
} catch (Exception e) {
log.error("Failed to process order event from Kafka", e);
// Implement dead letter queue logic here
}
}
@KafkaListener(topics = "${app.kafka.topics.payment-events}")
public void consumePaymentEvents(ConsumerRecord<String, byte[]> record) {
try {
CloudEvent cloudEvent = CloudEventKafkaUtils.fromKafkaMessage(record);
cloudEventService.validateCloudEvent(cloudEvent);
PaymentProcessedEvent paymentEvent = cloudEventService.extractData(cloudEvent, PaymentProcessedEvent.class);
eventProcessingService.processPaymentEvent(paymentEvent, cloudEvent);
log.info("Processed payment event: {} for payment: {}", 
cloudEvent.getType(), paymentEvent.getPaymentId());
} catch (Exception e) {
log.error("Failed to process payment event from Kafka", e);
}
}
// Batch consumer for high-throughput scenarios
@KafkaListener(topics = "${app.kafka.topics.system-events}", 
containerFactory = "batchFactory")
public void consumeSystemEventsBatch(List<ConsumerRecord<String, byte[]>> records) {
log.info("Processing batch of {} system events", records.size());
for (ConsumerRecord<String, byte[]> record : records) {
try {
CloudEvent cloudEvent = CloudEventKafkaUtils.fromKafkaMessage(record);
cloudEventService.validateCloudEvent(cloudEvent);
eventProcessingService.processSystemEvent(cloudEvent);
} catch (Exception e) {
log.error("Failed to process system event in batch", e);
}
}
}
}

Event Processing Service

1. Core Event Processing

@Service
@Slf4j
public class EventProcessingService {
private final UserService userService;
private final OrderService orderService;
private final NotificationService notificationService;
private final MetricsService metricsService;
public EventProcessingService(UserService userService,
OrderService orderService,
NotificationService notificationService,
MetricsService metricsService) {
this.userService = userService;
this.orderService = orderService;
this.notificationService = notificationService;
this.metricsService = metricsService;
}
@Async("eventProcessorExecutor")
public void processEvent(CloudEvent cloudEvent) {
metricsService.incrementEventCounter(cloudEvent.getType());
try {
switch (cloudEvent.getType()) {
case "com.example.user.created":
processUserCreatedEvent(cloudEvent);
break;
case "com.example.order.placed":
processOrderPlacedEvent(cloudEvent);
break;
case "com.example.payment.processed":
processPaymentProcessedEvent(cloudEvent);
break;
default:
log.warn("Unknown event type: {}", cloudEvent.getType());
metricsService.incrementUnknownEventCounter();
}
} catch (Exception e) {
log.error("Error processing event: {}", cloudEvent.getId(), e);
metricsService.incrementEventErrorCounter(cloudEvent.getType());
}
}
public void processUserEvent(UserCreatedEvent userEvent, CloudEvent cloudEvent) {
log.info("Processing user creation for: {}", userEvent.getEmail());
// Business logic for user creation
userService.createUser(userEvent);
notificationService.sendWelcomeEmail(userEvent);
metricsService.recordEventProcessingTime(cloudEvent.getType(), System.currentTimeMillis());
}
public void processOrderEvent(OrderPlacedEvent orderEvent, CloudEvent cloudEvent) {
log.info("Processing order: {} for customer: {}", 
orderEvent.getOrderId(), orderEvent.getCustomerId());
// Business logic for order processing
orderService.processOrder(orderEvent);
notificationService.sendOrderConfirmation(orderEvent);
Map<String, Object> extensions = cloudEventService.extractExtensions(cloudEvent);
if ("premium".equals(extensions.get("customertier"))) {
notificationService.sendPremiumCustomerNotification(orderEvent);
}
}
public void processPaymentEvent(PaymentProcessedEvent paymentEvent, CloudEvent cloudEvent) {
log.info("Processing payment: {} for order: {}", 
paymentEvent.getPaymentId(), paymentEvent.getOrderId());
if ("COMPLETED".equals(paymentEvent.getStatus())) {
orderService.completeOrder(paymentEvent.getOrderId());
notificationService.sendPaymentConfirmation(paymentEvent);
} else if ("FAILED".equals(paymentEvent.getStatus())) {
orderService.failOrder(paymentEvent.getOrderId());
notificationService.sendPaymentFailureNotification(paymentEvent);
}
}
public void processSystemEvent(CloudEvent cloudEvent) {
log.info("Processing system event: {} from source: {}", 
cloudEvent.getType(), cloudEvent.getSource());
// Handle system-level events
Map<String, Object> extensions = cloudEventService.extractExtensions(cloudEvent);
String severity = (String) extensions.getOrDefault("severity", "info");
switch (severity) {
case "error":
handleSystemError(cloudEvent);
break;
case "warning":
handleSystemWarning(cloudEvent);
break;
default:
handleSystemInfo(cloudEvent);
}
}
private void processUserCreatedEvent(CloudEvent cloudEvent) {
UserCreatedEvent userEvent = cloudEventService.extractData(cloudEvent, UserCreatedEvent.class);
processUserEvent(userEvent, cloudEvent);
}
private void processOrderPlacedEvent(CloudEvent cloudEvent) {
OrderPlacedEvent orderEvent = cloudEventService.extractData(cloudEvent, OrderPlacedEvent.class);
processOrderEvent(orderEvent, cloudEvent);
}
private void processPaymentProcessedEvent(CloudEvent cloudEvent) {
PaymentProcessedEvent paymentEvent = cloudEventService.extractData(cloudEvent, PaymentProcessedEvent.class);
processPaymentEvent(paymentEvent, cloudEvent);
}
private void handleSystemError(CloudEvent cloudEvent) {
// Implement system error handling logic
log.error("System error event: {}", cloudEvent.getType());
}
private void handleSystemWarning(CloudEvent cloudEvent) {
// Implement system warning handling logic
log.warn("System warning event: {}", cloudEvent.getType());
}
private void handleSystemInfo(CloudEvent cloudEvent) {
// Implement system info handling logic
log.info("System info event: {}", cloudEvent.getType());
}
}

Configuration and Utilities

1. Application Configuration

# application.yml
app:
events:
source: "urn:example-app:service"
default-content-type: "application/json"
enable-validation: true
batch-size: 100
retry-attempts: 3
kafka:
topics:
user-events: "user-events"
order-events: "order-events"
payment-events: "payment-events"
system-events: "system-events"
bootstrap-servers: "localhost:9092"
group-id: "event-processor"
http:
event-endpoint: "http://localhost:8080/api/events/receive"
timeout-ms: 5000
max-connections: 100
spring:
kafka:
bootstrap-servers: ${app.kafka.bootstrap-servers}
consumer:
group-id: ${app.kafka.group-id}
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
properties:
spring.json.trusted.packages: "*"
management:
endpoints:
web:
exposure:
include: health,metrics,events
endpoint:
events:
enabled: true

2. CloudEvents Utility Classes

@Component
public class CloudEventUtils {
private final ObjectMapper objectMapper;
public CloudEventUtils(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
// Configure ObjectMapper for CloudEvents
this.objectMapper.registerModule(new CloudEventJacksonModule());
}
public String cloudEventToJson(CloudEvent cloudEvent) {
try {
return objectMapper.writeValueAsString(cloudEvent);
} catch (Exception e) {
throw new CloudEventException("Failed to serialize CloudEvent to JSON", e);
}
}
public CloudEvent jsonToCloudEvent(String json) {
try {
return objectMapper.readValue(json, CloudEvent.class);
} catch (Exception e) {
throw new CloudEventException("Failed to deserialize CloudEvent from JSON", e);
}
}
public boolean isCloudEvent(Map<String, String> headers) {
return headers.containsKey("ce-id") && 
headers.containsKey("ce-specversion") && 
headers.containsKey("ce-type") && 
headers.containsKey("ce-source");
}
public String generateCloudEventId() {
return UUID.randomUUID().toString();
}
public URI generateSourceUri(String serviceName) {
return URI.create("urn:example-app:" + serviceName);
}
}
public class CloudEventKafkaUtils {
public static CloudEvent fromKafkaMessage(ConsumerRecord<String, byte[]> record) {
try {
// Extract CloudEvent attributes from Kafka headers
Map<String, String> attributes = new HashMap<>();
record.headers().forEach(header -> {
if (header.key().startsWith("ce_")) {
String attributeName = header.key().substring(3); // Remove "ce_" prefix
attributes.put(attributeName, new String(header.value(), StandardCharsets.UTF_8));
}
});
// Build CloudEvent
CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(attributes.get("id"))
.withSource(URI.create(attributes.get("source")))
.withType(attributes.get("type"))
.withData(record.value());
if (attributes.containsKey("subject")) {
builder.withSubject(attributes.get("subject"));
}
if (attributes.containsKey("time")) {
builder.withTime(OffsetDateTime.parse(attributes.get("time")));
}
return builder.build();
} catch (Exception e) {
throw new CloudEventException("Failed to create CloudEvent from Kafka message", e);
}
}
public static byte[] toKafkaMessage(CloudEvent cloudEvent) {
try {
return CloudEventsMessageUtils.toStructuredMessage(cloudEvent, 
ObjectMapperMessageFactory.create(new ObjectMapper()));
} catch (Exception e) {
throw new CloudEventException("Failed to convert CloudEvent to Kafka message", e);
}
}
}

Testing

1. Unit Tests

@ExtendWith(MockitoExtension.class)
class CloudEventServiceTest {
@InjectMocks
private CloudEventService cloudEventService;
@Mock
private ObjectMapper objectMapper;
@Test
void shouldCreateCloudEvent() {
// Given
UserCreatedEvent userEvent = UserCreatedEvent.builder()
.userId("123")
.email("[email protected]")
.build();
// When
CloudEvent cloudEvent = cloudEventService.createCloudEvent(
"com.example.user.created", userEvent);
// Then
assertNotNull(cloudEvent);
assertNotNull(cloudEvent.getId());
assertEquals("com.example.user.created", cloudEvent.getType());
assertEquals("application/json", cloudEvent.getDataContentType());
}
@Test
void shouldExtractDataFromCloudEvent() throws Exception {
// Given
UserCreatedEvent expectedEvent = UserCreatedEvent.builder()
.userId("123")
.email("[email protected]")
.build();
CloudEvent cloudEvent = CloudEventBuilder.v1()
.withId("test-id")
.withSource(URI.create("test-source"))
.withType("test-type")
.withData("application/json", 
objectMapper.writeValueAsBytes(expectedEvent))
.build();
when(objectMapper.readValue(any(byte[].class), eq(UserCreatedEvent.class)))
.thenReturn(expectedEvent);
// When
UserCreatedEvent extractedEvent = cloudEventService.extractData(
cloudEvent, UserCreatedEvent.class);
// Then
assertNotNull(extractedEvent);
assertEquals(expectedEvent.getUserId(), extractedEvent.getUserId());
}
}
@SpringBootTest
class CloudEventIntegrationTest {
@Autowired
private CloudEventService cloudEventService;
@Autowired
private KafkaCloudEventProducer kafkaProducer;
@Test
void shouldSendAndReceiveCloudEvent() {
// Integration test for end-to-end CloudEvent flow
UserCreatedEvent userEvent = UserCreatedEvent.builder()
.userId("test-123")
.email("[email protected]")
.build();
CloudEvent cloudEvent = cloudEventService.createCloudEvent(
"com.example.user.created", userEvent);
assertDoesNotThrow(() -> {
kafkaProducer.sendCloudEvent("test-topic", cloudEvent);
});
}
}

Best Practices

1. Error Handling and Retry

@Component
@Slf4j
public class CloudEventRetryHandler {
private final RetryTemplate retryTemplate;
public CloudEventRetryHandler() {
this.retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000L); // 2 seconds
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
}
public void executeWithRetry(Runnable operation, String operationName) {
try {
retryTemplate.execute(context -> {
try {
operation.run();
return null;
} catch (Exception e) {
log.warn("Attempt {} failed for {}: {}", 
context.getRetryCount(), operationName, e.getMessage());
throw e;
}
});
} catch (Exception e) {
log.error("All retry attempts failed for: {}", operationName, e);
throw new CloudEventException("Operation failed after retries: " + operationName, e);
}
}
}

2. Monitoring and Metrics

@Component
@Slf4j
public class CloudEventMetrics {
private final MeterRegistry meterRegistry;
private final Counter eventCounter;
private final Timer eventProcessingTimer;
public CloudEventMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventCounter = Counter.builder("cloudevents.processed")
.description("Number of CloudEvents processed")
.tag("version", "1.0")
.register(meterRegistry);
this.eventProcessingTimer = Timer.builder("cloudevents.processing.time")
.description("Time taken to process CloudEvents")
.register(meterRegistry);
}
public void incrementEventCounter(String eventType) {
eventCounter.increment();
meterRegistry.counter("cloudevents.by.type", "type", eventType).increment();
}
public void recordEventProcessingTime(String eventType, long startTime) {
long duration = System.currentTimeMillis() - startTime;
eventProcessingTimer.record(duration, TimeUnit.MILLISECONDS);
meterRegistry.timer("cloudevents.processing.time.by.type", "type", eventType)
.record(duration, TimeUnit.MILLISECONDS);
}
public void incrementErrorCounter(String eventType, String errorType) {
meterRegistry.counter("cloudevents.errors", 
"type", eventType, "error", errorType).increment();
}
}

Conclusion

The CloudEvents SDK for Java provides a robust foundation for building event-driven architectures with standardized event formats. Key benefits include:

  1. Interoperability - Standardized event format across systems
  2. Extensibility - Custom attributes and extensions
  3. Multiple Bindings - HTTP, Kafka, and other protocol support
  4. Type Safety - Strong typing with Java objects
  5. Integration - Seamless Spring and Kafka integration

By following these patterns, you can build scalable, maintainable event-driven systems that communicate effectively across different platforms and programming languages.

Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/

OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/

OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/

Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.

https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics

Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2

Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide

Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2

Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide

Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.

https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server

Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper