Event-Driven Java: Building Knative Eventing Sources and Sinks

Article

Knative Eventing is a Kubernetes-based platform that provides tools for building event-driven applications. Eventing Sources are the entry points that generate or capture events from external systems and deliver them to the Knative eventing mesh. For Java developers, creating custom event sources enables integration with various systems while leveraging Knative's powerful event routing capabilities.

In this guide, we'll explore how to build, deploy, and manage Knative Eventing Sources using Java, creating robust event-driven architectures on Kubernetes.

Why Knative Eventing Sources for Java?

  • Cloud Events Standard: Native support for CNCF CloudEvents specification
  • Multi-Protocol Support: HTTP, gRPC, WebSocket, and custom protocols
  • Scalability: Automatic scaling based on event load
  • Portability: Run on any Kubernetes cluster with Knative
  • Rich Ecosystem: Integrate with existing event sources and brokers
  • Developer Productivity: Focus on business logic rather than infrastructure

Part 1: Knative Eventing Fundamentals

1.1 Core Concepts

  • Event Sources: Producers that generate events (Kafka, GitHub, Cron, Custom)
  • Brokers: Event mesh backbone for routing and delivery
  • Triggers: Subscription rules that route events from brokers to sinks
  • Sinks: Event consumers (Services, Channels, Brokers)
  • CloudEvents: Standard event format for interoperability

1.2 Project Structure

knative-java-eventing/
├── src/
│   ├── main/
│   │   ├── java/com/example/events/
│   │   │   ├── source/
│   │   │   ├── sink/
│   │   │   └── model/
│   │   └── resources/
│   │       └── k8s/
│   └── test/
│       └── java/com/example/events/
├── Dockerfile
├── knative/
│   ├── service.yaml
│   ├── trigger.yaml
│   └── source.yaml
└── pom.xml

Part 2: Dependencies and Setup

2.1 Maven Dependencies

<!-- pom.xml -->
<properties>
<knative.version>1.11.0</knative.version>
<cloudevents.version>3.0.0</cloudevents.version>
<spring-boot.version>3.1.0</spring-boot.version>
<kubernetes-client.version>6.7.2</kubernetes-client.version>
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- CloudEvents SDK -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- Knative -->
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>dev.knative</groupId>
<artifactId>eventing</artifactId>
<version>${knative.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-test</artifactId>
<version>${cloudevents.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

2.2 CloudEvents Model

// File: src/main/java/com/example/events/model/CloudEventData.java
package com.example.events.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class CloudEventData {
private String eventId;
private String eventType;
private String eventSource;
private Instant eventTime;
private String subject;
private Map<String, Object> data;
private Map<String, String> extensions;
// Constructors
public CloudEventData() {}
public CloudEventData(String eventId, String eventType, String eventSource, 
Instant eventTime, String subject, Map<String, Object> data) {
this.eventId = eventId;
this.eventType = eventType;
this.eventSource = eventSource;
this.eventTime = eventTime;
this.subject = subject;
this.data = data;
}
// Getters and Setters
public String getEventId() { return eventId; }
public void setEventId(String eventId) { this.eventId = eventId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public String getEventSource() { return eventSource; }
public void setEventSource(String eventSource) { this.eventSource = eventSource; }
public Instant getEventTime() { return eventTime; }
public void setEventTime(Instant eventTime) { this.eventTime = eventTime; }
public String getSubject() { return subject; }
public void setSubject(String subject) { this.subject = subject; }
public Map<String, Object> getData() { return data; }
public void setData(Map<String, Object> data) { this.data = data; }
public Map<String, String> getExtensions() { return extensions; }
public void setExtensions(Map<String, String> extensions) { this.extensions = extensions; }
}

Part 3: Custom Event Source Implementation

3.1 HTTP-Based Event Source

// File: src/main/java/com/example/events/source/HttpEventSourceController.java
package com.example.events.source;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.http.HttpMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/api/v1/events")
public class HttpEventSourceController {
private static final Logger logger = LoggerFactory.getLogger(HttpEventSourceController.class);
private final EventSenderService eventSenderService;
private final CopyOnWriteArrayList<SseEmitter> sseEmitters = new CopyOnWriteArrayList<>();
private final ConcurrentHashMap<String, CloudEvent> pendingEvents = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
public HttpEventSourceController(EventSenderService eventSenderService) {
this.eventSenderService = eventSenderService;
startEventGenerator();
}
@PostMapping("/ingest")
public ResponseEntity<String> ingestEvent(
@RequestBody String eventData,
@RequestHeader HttpHeaders headers,
HttpServletRequest request,
HttpServletResponse response) {
try {
// Parse CloudEvent from HTTP request
CloudEvent cloudEvent = HttpMessageFactory
.createReaderFromRequest(request, eventData.getBytes())
.toEvent();
logger.info("Received CloudEvent: id={}, type={}, source={}", 
cloudEvent.getId(), cloudEvent.getType(), cloudEvent.getSource());
// Process and forward the event
processIncomingEvent(cloudEvent);
return ResponseEntity.accepted().body("Event processed successfully");
} catch (Exception e) {
logger.error("Failed to process incoming event", e);
return ResponseEntity.badRequest().body("Invalid CloudEvent format");
}
}
@GetMapping("/stream")
public SseEmitter streamEvents() {
SseEmitter emitter = new SseEmitter(30_000L); // 30 seconds timeout
sseEmitters.add(emitter);
emitter.onCompletion(() -> {
sseEmitters.remove(emitter);
logger.info("SSE connection completed");
});
emitter.onTimeout(() -> {
sseEmitters.remove(emitter);
logger.info("SSE connection timed out");
});
emitter.onError((ex) -> {
sseEmitters.remove(emitter);
logger.error("SSE connection error", ex);
});
// Send initial event
sendSseEvent(emitter, createHeartbeatEvent());
return emitter;
}
@PostMapping("/custom")
public ResponseEntity<CloudEvent> createCustomEvent(@RequestBody CustomEventRequest request) {
try {
CloudEvent cloudEvent = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType(request.getEventType())
.withSource(URI.create(request.getEventSource()))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData(request.getData().getBytes())
.withExtension("customtype", request.getCustomType())
.build();
// Send to Knative broker
eventSenderService.sendToBroker(cloudEvent);
logger.info("Created and sent custom event: {}", cloudEvent.getId());
return ResponseEntity.accepted().body(cloudEvent);
} catch (Exception e) {
logger.error("Failed to create custom event", e);
return ResponseEntity.badRequest().build();
}
}
private void processIncomingEvent(CloudEvent cloudEvent) {
// Store event temporarily
pendingEvents.put(cloudEvent.getId(), cloudEvent);
// Send to Knative broker
eventSenderService.sendToBroker(cloudEvent);
// Broadcast to SSE clients
broadcastToSseClients(cloudEvent);
// Remove from pending after processing
scheduler.schedule(() -> {
pendingEvents.remove(cloudEvent.getId());
}, 30, TimeUnit.SECONDS);
}
private void broadcastToSseClients(CloudEvent cloudEvent) {
for (SseEmitter emitter : sseEmitters) {
try {
sendSseEvent(emitter, cloudEvent);
} catch (Exception e) {
logger.warn("Failed to send event to SSE client", e);
sseEmitters.remove(emitter);
}
}
}
private void sendSseEvent(SseEmitter emitter, CloudEvent event) {
try {
String eventJson = new String(EventFormatProvider.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(event));
emitter.send(SseEmitter.event()
.id(event.getId())
.name(event.getType())
.data(eventJson));
} catch (Exception e) {
logger.error("Failed to send SSE event", e);
}
}
private CloudEvent createHeartbeatEvent() {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType("com.example.heartbeat")
.withSource(URI.create("https://events.example.com/source"))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData("{\"status\": \"alive\", \"timestamp\": \"" + OffsetDateTime.now() + "\"}")
.build();
}
private void startEventGenerator() {
// Generate periodic events for demonstration
scheduler.scheduleAtFixedRate(() -> {
try {
CloudEvent heartbeat = createHeartbeatEvent();
eventSenderService.sendToBroker(heartbeat);
broadcastToSseClients(heartbeat);
} catch (Exception e) {
logger.error("Error generating heartbeat event", e);
}
}, 60, 60, TimeUnit.SECONDS);
}
}
// File: src/main/java/com/example/events/source/CustomEventRequest.java
package com.example.events.source;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CustomEventRequest {
private String eventType;
private String eventSource;
private String customType;
private String data;
// Getters and Setters
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public String getEventSource() { return eventSource; }
public void setEventSource(String eventSource) { this.eventSource = eventSource; }
public String getCustomType() { return customType; }
public void setCustomType(String customType) { this.customType = customType; }
public String getData() { return data; }
public void setData(String data) { this.data = data; }
}

3.2 Event Sender Service

// File: src/main/java/com/example/events/source/EventSenderService.java
package com.example.events.source;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.HttpMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.Collections;
@Service
public class EventSenderService {
private static final Logger logger = LoggerFactory.getLogger(EventSenderService.class);
private final RestTemplate restTemplate;
@Value("${knative.broker.url:http://broker-knative-broker.default.svc.cluster.local}")
private String brokerUrl;
@Value("${knative.broker.enabled:true}")
private boolean brokerEnabled;
public EventSenderService(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
public boolean sendToBroker(CloudEvent cloudEvent) {
if (!brokerEnabled) {
logger.debug("Broker disabled, skipping event send: {}", cloudEvent.getId());
return true;
}
try {
// Convert CloudEvent to HTTP request
HttpHeaders headers = new HttpHeaders();
headers.setContentType(org.springframework.http.MediaType.APPLICATION_JSON);
headers.setAccept(Collections.singletonList(org.springframework.http.MediaType.APPLICATION_JSON));
byte[] eventBytes = EventFormatProvider.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(cloudEvent);
HttpEntity<byte[]> requestEntity = new HttpEntity<>(eventBytes, headers);
ResponseEntity<String> response = restTemplate.exchange(
brokerUrl,
HttpMethod.POST,
requestEntity,
String.class
);
if (response.getStatusCode().is2xxSuccessful()) {
logger.debug("Successfully sent event to broker: {}", cloudEvent.getId());
return true;
} else {
logger.error("Failed to send event to broker. Status: {}, Response: {}", 
response.getStatusCode(), response.getBody());
return false;
}
} catch (Exception e) {
logger.error("Error sending event to broker: {}", cloudEvent.getId(), e);
return false;
}
}
public boolean sendToSink(CloudEvent cloudEvent, String sinkUrl) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(org.springframework.http.MediaType.APPLICATION_JSON);
byte[] eventBytes = EventFormatProvider.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(cloudEvent);
HttpEntity<byte[]> requestEntity = new HttpEntity<>(eventBytes, headers);
ResponseEntity<String> response = restTemplate.exchange(
sinkUrl,
HttpMethod.POST,
requestEntity,
String.class
);
if (response.getStatusCode().is2xxSuccessful()) {
logger.debug("Successfully sent event to sink: {}", sinkUrl);
return true;
} else {
logger.warn("Failed to send event to sink. Status: {}, Response: {}", 
response.getStatusCode(), response.getBody());
return false;
}
} catch (Exception e) {
logger.error("Error sending event to sink: {}", sinkUrl, e);
return false;
}
}
}

Part 4: Kafka Event Source

4.1 Kafka to Knative Bridge

// File: src/main/java/com/example/events/source/KafkaEventSource.java
package com.example.events.source;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.URI;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class KafkaEventSource {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventSource.class);
private final EventSenderService eventSenderService;
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService executorService;
private KafkaConsumer<String, String> kafkaConsumer;
@Value("${kafka.bootstrap.servers:localhost:9092}")
private String bootstrapServers;
@Value("${kafka.topic:knative-events}")
private String topic;
@Value("${kafka.group.id:knative-event-source}")
private String groupId;
@Value("${kafka.auto.offset.reset:earliest}")
private String autoOffsetReset;
public KafkaEventSource(EventSenderService eventSenderService) {
this.eventSenderService = eventSenderService;
}
@PostConstruct
public void initialize() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
startConsumer();
}
@PreDestroy
public void shutdown() {
running.set(false);
if (executorService != null) {
executorService.shutdown();
}
if (kafkaConsumer != null) {
kafkaConsumer.close();
}
}
private void startConsumer() {
running.set(true);
executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
logger.info("Starting Kafka consumer for topic: {}", topic);
while (running.get()) {
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processKafkaRecord(record);
}
} catch (Exception e) {
logger.error("Error in Kafka consumer", e);
// Add backoff and retry logic
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("Kafka consumer stopped");
});
}
private void processKafkaRecord(ConsumerRecord<String, String> record) {
try {
CloudEvent cloudEvent = convertToCloudEvent(record);
boolean sent = eventSenderService.sendToBroker(cloudEvent);
if (sent) {
logger.debug("Successfully processed Kafka record: topic={}, partition={}, offset={}", 
record.topic(), record.partition(), record.offset());
} else {
logger.warn("Failed to send event from Kafka record: topic={}, offset={}", 
record.topic(), record.offset());
}
} catch (Exception e) {
logger.error("Failed to process Kafka record: topic={}, offset={}", 
record.topic(), record.offset(), e);
}
}
private CloudEvent convertToCloudEvent(ConsumerRecord<String, String> record) {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType("com.example.kafka." + record.topic())
.withSource(URI.create("https://kafka.example.com/" + record.topic()))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData(record.value().getBytes())
.withExtension("kafkatopic", record.topic())
.withExtension("kafkapartition", record.partition())
.withExtension("kafkaoffset", record.offset())
.withExtension("kafkakey", record.key() != null ? record.key() : "")
.build();
}
}

Part 5: Event Sink Implementation

5.1 CloudEvent Receiver

// File: src/main/java/com/example/events/sink/EventSinkController.java
package com.example.events.sink;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.HttpMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/api/v1/sink")
public class EventSinkController {
private static final Logger logger = LoggerFactory.getLogger(EventSinkController.class);
private final AtomicLong eventCounter = new AtomicLong(0);
private final EventProcessorService eventProcessorService;
public EventSinkController(EventProcessorService eventProcessorService) {
this.eventProcessorService = eventProcessorService;
}
@PostMapping("/events")
public ResponseEntity<String> receiveEvent(
HttpServletRequest request,
@RequestBody(required = false) String body) {
long eventNumber = eventCounter.incrementAndGet();
try {
// Parse CloudEvent from HTTP request
CloudEvent cloudEvent = HttpMessageFactory
.createReaderFromRequest(request, body != null ? body.getBytes() : new byte[0])
.toEvent();
logger.info("Received CloudEvent #{}: id={}, type={}, source={}", 
eventNumber, cloudEvent.getId(), cloudEvent.getType(), cloudEvent.getSource());
// Process the event
eventProcessorService.processEvent(cloudEvent);
return ResponseEntity.accepted().body("Event processed successfully");
} catch (Exception e) {
logger.error("Failed to process CloudEvent #{}", eventNumber, e);
return ResponseEntity.badRequest().body("Invalid CloudEvent format");
}
}
@GetMapping("/metrics")
public EventMetrics getMetrics() {
return new EventMetrics(eventCounter.get());
}
}
// File: src/main/java/com/example/events/sink/EventProcessorService.java
package com.example.events.sink;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class EventProcessorService {
private static final Logger logger = LoggerFactory.getLogger(EventProcessorService.class);
private final ConcurrentHashMap<String, EventHandler> eventHandlers = new ConcurrentHashMap<>();
private final AtomicLong processedCount = new AtomicLong(0);
public EventProcessorService() {
// Register default event handlers
registerHandler("com.example.user.created", new UserEventHandler());
registerHandler("com.example.order.placed", new OrderEventHandler());
registerHandler("com.example.system.alert", new SystemEventHandler());
}
public void processEvent(CloudEvent cloudEvent) {
try {
String eventType = cloudEvent.getType();
EventHandler handler = eventHandlers.get(eventType);
if (handler != null) {
handler.handle(cloudEvent);
} else {
handleUnknownEvent(cloudEvent);
}
processedCount.incrementAndGet();
logger.debug("Processed event: {}", cloudEvent.getId());
} catch (Exception e) {
logger.error("Error processing event: {}", cloudEvent.getId(), e);
// Implement dead letter queue or retry logic
}
}
public void registerHandler(String eventType, EventHandler handler) {
eventHandlers.put(eventType, handler);
logger.info("Registered event handler for type: {}", eventType);
}
private void handleUnknownEvent(CloudEvent cloudEvent) {
logger.warn("No handler registered for event type: {}", cloudEvent.getType());
// Could store in dead letter queue or send to monitoring system
}
public long getProcessedCount() {
return processedCount.get();
}
}
// File: src/main/java/com/example/events/sink/EventHandler.java
package com.example.events.sink;
import io.cloudevents.CloudEvent;
public interface EventHandler {
void handle(CloudEvent cloudEvent);
}
// File: src/main/java/com/example/events/sink/UserEventHandler.java
package com.example.events.sink;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserEventHandler implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(UserEventHandler.class);
@Override
public void handle(CloudEvent cloudEvent) {
logger.info("Processing user event: {}", cloudEvent.getId());
// Extract and process user data
if (cloudEvent.getData() != null) {
String data = new String(cloudEvent.getData().toBytes());
logger.info("User event data: {}", data);
// Business logic for user events
// e.g., update user database, send welcome email, etc.
}
// Process extensions
cloudEvent.getExtensionNames().forEach(ext -> {
logger.debug("Extension {}: {}", ext, cloudEvent.getExtension(ext));
});
}
}
// File: src/main/java/com/example/events/sink/EventMetrics.java
package com.example.events.sink;
public class EventMetrics {
private final long totalEventsProcessed;
private final long timestamp;
public EventMetrics(long totalEventsProcessed) {
this.totalEventsProcessed = totalEventsProcessed;
this.timestamp = System.currentTimeMillis();
}
// Getters
public long getTotalEventsProcessed() { return totalEventsProcessed; }
public long getTimestamp() { return timestamp; }
}

Part 6: Knative Configuration

6.1 Knative Service Definition

# knative/service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: java-event-source
namespace: default
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "10"
autoscaling.knative.dev/target: "100"
spec:
containers:
- image: my-registry/java-event-source:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-broker:9092"
- name: KAFKA_TOPIC
value: "knative-events"
- name: KNAITIVE_BROKER_URL
value: "http://broker-knative-broker.default.svc.cluster.local"
- name: JAVA_OPTS
value: "-Xmx512m -Xms256m"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 20
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5

6.2 Knative Trigger Configuration

# knative/trigger.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: java-user-events-trigger
namespace: default
spec:
broker: default
filter:
attributes:
type: "com.example.user.created"
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: user-service
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: java-order-events-trigger
namespace: default
spec:
broker: default
filter:
attributes:
type: "com.example.order.placed"
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-service
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: java-all-events-trigger
namespace: default
spec:
broker: default
filter:
attributes:
type: "com.example.system.alert"
subscriber:
uri: http://monitoring-service.default.svc.cluster.local/api/alerts

6.3 Kafka Source Configuration

# knative/kafka-source.yaml
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-java-events-source
namespace: default
spec:
consumerGroup: knative-java-consumer
bootstrapServers:
- kafka-broker:9092
topics:
- knative-events
- user-events
- order-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: java-event-processor

Part 7: Spring Boot Application

7.1 Main Application Class

// File: src/main/java/com/example/EventSourceApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
public class EventSourceApplication {
public static void main(String[] args) {
SpringApplication.run(EventSourceApplication.class, args);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public io.cloudevents.CloudEventFormat cloudEventFormat() {
return io.cloudevents.core.provider.EventFormatProvider.getInstance()
.resolveFormat(io.cloudevents.jackson.JsonFormat.CONTENT_TYPE);
}
}
// File: src/main/resources/application.yaml
server:
port: 8080
spring:
application:
name: java-event-source
jackson:
date-format: com.fasterxml.jackson.databind.util.StdDateFormat
default-property-inclusion: NON_NULL
management:
endpoints:
web:
exposure:
include: health,info,metrics,configreload
endpoint:
health:
show-details: always
knative:
broker:
url: http://broker-knative-broker.default.svc.cluster.local
enabled: true
kafka:
bootstrap:
servers: localhost:9092
topic: knative-events
group:
id: knative-event-source-java
logging:
level:
com.example.events: DEBUG
org.apache.kafka: WARN

Part 8: Testing

8.1 Unit Tests

// File: src/test/java/com/example/events/source/EventSenderServiceTest.java
package com.example.events.source;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.time.OffsetDateTime;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class EventSenderServiceTest {
@Mock
private RestTemplate restTemplate;
private EventSenderService eventSenderService;
private CloudEvent testEvent;
@BeforeEach
void setUp() {
eventSenderService = new EventSenderService(restTemplate);
testEvent = CloudEventBuilder.v1()
.withId("test-id")
.withType("com.example.test")
.withSource(URI.create("https://test.example.com"))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData("{\"test\": \"data\"}".getBytes())
.build();
}
@Test
void shouldSendEventToBrokerSuccessfully() {
// Given
ResponseEntity<String> response = ResponseEntity.accepted().body("OK");
when(restTemplate.exchange(
anyString(),
eq(HttpMethod.POST),
any(HttpEntity.class),
eq(String.class)
)).thenReturn(response);
// When
boolean result = eventSenderService.sendToBroker(testEvent);
// Then
assertTrue(result);
verify(restTemplate, times(1)).exchange(
anyString(),
eq(HttpMethod.POST),
any(HttpEntity.class),
eq(String.class)
);
}
@Test
void shouldHandleBrokerError() {
// Given
ResponseEntity<String> response = ResponseEntity.status(500).body("Error");
when(restTemplate.exchange(
anyString(),
eq(HttpMethod.POST),
any(HttpEntity.class),
eq(String.class)
)).thenReturn(response);
// When
boolean result = eventSenderService.sendToBroker(testEvent);
// Then
assertFalse(result);
}
}

8.2 Integration Tests

// File: src/test/java/com/example/events/sink/EventSinkControllerIntegrationTest.java
package com.example.events.sink;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import java.net.URI;
import java.time.OffsetDateTime;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
@SpringBootTest
@AutoConfigureMockMvc
class EventSinkControllerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Test
void shouldAcceptValidCloudEvent() throws Exception {
CloudEvent cloudEvent = CloudEventBuilder.v1()
.withId("test-integration-id")
.withType("com.example.user.created")
.withSource(URI.create("https://test.example.com"))
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData("{\"userId\": \"123\", \"name\": \"Test User\"}".getBytes())
.build();
byte[] eventBytes = new JsonFormat().serialize(cloudEvent);
mockMvc.perform(post("/api/v1/sink/events")
.contentType(MediaType.APPLICATION_JSON)
.header("ce-id", cloudEvent.getId())
.header("ce-type", cloudEvent.getType())
.header("ce-source", cloudEvent.getSource().toString())
.header("ce-specversion", "1.0")
.content(eventBytes))
.andExpect(status().isAccepted())
.andExpect(content().string("Event processed successfully"));
}
}

Best Practices for Knative Eventing in Java

  1. Idempotency: Design event handlers to be idempotent to handle duplicate events
  2. Error Handling: Implement proper error handling and dead letter queues
  3. Observability: Add comprehensive logging, metrics, and tracing
  4. Resource Management: Configure proper resource limits and scaling parameters
  5. Security: Implement authentication and authorization for event endpoints
  6. Testing: Test event flows end-to-end including failure scenarios
  7. Monitoring: Set up alerts for event processing failures and performance issues
  8. Documentation: Document event schemas and processing expectations

Conclusion

Building Knative Eventing Sources in Java enables you to create robust, scalable event-driven architectures on Kubernetes. By leveraging the CloudEvents specification and Knative's powerful event routing capabilities, Java applications can:

  • Process events from multiple sources (HTTP, Kafka, custom)
  • Route events intelligently using Knative brokers and triggers
  • Scale automatically based on event load
  • Maintain interoperability with other CloudEvents-compliant systems
  • Implement complex event processing with type-safe Java code

The patterns and examples in this guide provide a solid foundation for building production-ready Knative event sources and sinks in Java, enabling you to create sophisticated event-driven microservices architectures.

Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/

OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/

OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/

Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.

https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics

Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2

Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide

Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2

Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide

Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.

https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server

Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper