Article
Knative Eventing provides a powerful platform for building event-driven applications on Kubernetes. While Knative offers many built-in event sources, there are scenarios where you need custom event sources to integrate with proprietary systems, legacy applications, or specialized cloud services. This article explores how to build custom Knative Eventing Sources in Java, enabling you to extend Knative's eventing capabilities to any system that can be integrated with Java.
Understanding Knative Eventing Sources
What are Knative Eventing Sources?
Knative Eventing Sources are Kubernetes custom resources that generate and send events to Knative Eventing. They act as bridges between external systems and Knative's event mesh.
Key Concepts:
- Source: Produces events from external systems
- Broker: Event mesh for routing events
- Trigger: Subscribes to events from brokers
- Channel: Transport mechanism for events
- Subscription: Links channels to services
Architecture Overview
External System (Database, API, Message Queue) ↓ Custom Source (Java Application) ↓ Knative Broker/Channel ↓ Knative Service (Event Consumer)
Building Custom Knative Eventing Sources in Java
1. Project Dependencies
<properties>
<knative.version>1.11.0</knative.version>
<kubernetes-client.version>6.7.2</kubernetes-client.version>
<spring-boot.version>3.1.0</spring-boot.version>
<cloudevents.version>2.4.0</cloudevents.version>
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Kubernetes Client -->
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<!-- Knative Eventing -->
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>knative-client</artifactId>
<version>${knative.version}</version>
</dependency>
<!-- CloudEvents SDK -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-vertx</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- HTTP Client -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
</dependencies>
2. Core Configuration Classes
@Configuration
@ConfigurationProperties(prefix = "knative.source")
@Data
public class KnativeSourceConfig {
private String name;
private String namespace;
private String brokerUrl;
private int retryAttempts = 3;
private long retryDelayMs = 1000;
private int batchSize = 100;
private Duration pollInterval = Duration.ofSeconds(30);
// Source-specific configuration
private DatabaseSourceConfig database;
private ApiSourceConfig api;
private FileSourceConfig file;
@Data
public static class DatabaseSourceConfig {
private String jdbcUrl;
private String username;
private String password;
private String tableName;
private String pollQuery;
}
@Data
public static class ApiSourceConfig {
private String baseUrl;
private String authToken;
private Map<String, String> headers = new HashMap<>();
}
@Data
public static class FileSourceConfig {
private String directory;
private String pattern;
private boolean recursive = false;
}
}
@Data
@Builder
public class EventSourceMetadata {
private String source;
private String type;
private String subject;
private Map<String, String> extensions;
private Instant createdAt;
}
3. CloudEvents Factory
@Component
@Slf4j
public class CloudEventFactory {
private final ObjectMapper objectMapper;
private final KnativeSourceConfig config;
public CloudEventFactory(KnativeSourceConfig config) {
this.config = config;
this.objectMapper = new ObjectMapper();
}
public CloudEvent createEvent(String eventType, Object data) {
return createEvent(eventType, null, data);
}
public CloudEvent createEvent(String eventType, String subject, Object data) {
try {
String eventId = UUID.randomUUID().toString();
String source = String.format("/sources/%s/%s",
config.getNamespace(), config.getName());
CloudEventBuilder eventBuilder = CloudEventBuilder.v1()
.withId(eventId)
.withSource(source)
.withType(eventType)
.withTime(OffsetDateTime.now())
.withDataContentType("application/json")
.withData(objectMapper.writeValueAsBytes(data));
if (subject != null) {
eventBuilder.withSubject(subject);
}
// Add extensions
eventBuilder.withExtension("knativesource", config.getName());
eventBuilder.withExtension("namespace", config.getNamespace());
return eventBuilder.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to create CloudEvent", e);
}
}
public CloudEvent createDatabaseEvent(String operation, Object record) {
String eventType = String.format("com.example.database.%s", operation);
return createEvent(eventType, "database", Map.of(
"operation", operation,
"record", record,
"timestamp", Instant.now().toString()
));
}
public CloudEvent createApiEvent(String endpoint, Object payload) {
String eventType = String.format("com.example.api.%s", endpoint.replace("/", "."));
return createEvent(eventType, "api", Map.of(
"endpoint", endpoint,
"payload", payload,
"timestamp", Instant.now().toString()
));
}
public CloudEvent createFileEvent(String action, String filePath) {
String eventType = String.format("com.example.file.%s", action);
return createEvent(eventType, "file", Map.of(
"action", action,
"filePath", filePath,
"timestamp", Instant.now().toString()
));
}
}
4. Event Sender Service
@Component
@Slf4j
public class EventSenderService {
private final KnativeSourceConfig config;
private final WebClient webClient;
private final MeterRegistry meterRegistry;
private final Counter successCounter;
private final Counter errorCounter;
private final Timer sendTimer;
public EventSenderService(KnativeSourceConfig config, WebClient.Builder webClientBuilder,
MeterRegistry meterRegistry) {
this.config = config;
this.meterRegistry = meterRegistry;
this.webClient = webClientBuilder.build();
// Initialize metrics
this.successCounter = Counter.builder("knative.source.events.sent")
.description("Number of events successfully sent")
.tag("source", config.getName())
.register(meterRegistry);
this.errorCounter = Counter.builder("knative.source.events.failed")
.description("Number of events that failed to send")
.tag("source", config.getName())
.register(meterRegistry);
this.sendTimer = Timer.builder("knative.source.send.duration")
.description("Time taken to send events")
.tag("source", config.getName())
.register(meterRegistry);
}
public Mono<Void> sendEvent(CloudEvent event) {
return sendEventWithRetry(event, config.getRetryAttempts());
}
private Mono<Void> sendEventWithRetry(CloudEvent event, int remainingAttempts) {
return Mono.fromCallable(() -> createHttpRequest(event))
.flatMap(this::executeSend)
.doOnSuccess(response -> {
successCounter.increment();
log.debug("Event sent successfully: {}", event.getId());
})
.onErrorResume(error -> {
log.warn("Failed to send event: {}", event.getId(), error);
errorCounter.increment();
if (remainingAttempts > 0) {
log.info("Retrying event {} (attempts left: {})",
event.getId(), remainingAttempts);
return Mono.delay(Duration.ofMillis(config.getRetryDelayMs()))
.then(Mono.defer(() -> sendEventWithRetry(event, remainingAttempts - 1)));
} else {
log.error("All retry attempts exhausted for event: {}", event.getId());
return Mono.error(new EventSendException(
"Failed to send event after " + config.getRetryAttempts() + " attempts", error));
}
})
.then();
}
private HttpRequest<Buffer> createHttpRequest(CloudEvent event) {
try {
return VertxCloudEvents.create().httpRequestBuilder(event)
.build(Buffer.buffer(event.getData().toBytes()));
} catch (Exception e) {
throw new RuntimeException("Failed to create HTTP request from CloudEvent", e);
}
}
private Mono<Void> executeSend(HttpRequest<Buffer> request) {
return webClient.post()
.uri(config.getBrokerUrl())
.headers(headers -> {
request.getHeaders().forEach(header ->
headers.add(header.getKey(), header.getValue()));
})
.bodyValue(request.getBody().getBytes())
.retrieve()
.toBodilessEntity()
.timeout(Duration.ofSeconds(30))
.doOnSubscribe(subscription -> sendTimer.record(() -> {}))
.then();
}
public Flux<Void> sendEvents(List<CloudEvent> events) {
return Flux.fromIterable(events)
.flatMap(this::sendEvent, 10) // Concurrency of 10
.doOnComplete(() ->
log.info("Successfully sent batch of {} events", events.size()))
.doOnError(error ->
log.error("Failed to send event batch", error));
}
}
class EventSendException extends RuntimeException {
public EventSendException(String message, Throwable cause) {
super(message, cause);
}
}
5. Database Change Source
@Component
@Slf4j
public class DatabaseChangeSource {
private final KnativeSourceConfig config;
private final EventSenderService eventSender;
private final CloudEventFactory eventFactory;
private final DataSource dataSource;
private volatile boolean running = false;
private Instant lastPollTime;
public DatabaseChangeSource(KnativeSourceConfig config, EventSenderService eventSender,
CloudEventFactory eventFactory, DataSource dataSource) {
this.config = config;
this.eventSender = eventSender;
this.eventFactory = eventFactory;
this.dataSource = dataSource;
this.lastPollTime = Instant.now().minus(Duration.ofHours(1));
}
@EventListener
public void onApplicationReady(ApplicationReadyEvent event) {
startPolling();
}
public void startPolling() {
if (running) {
log.warn("Database change source is already running");
return;
}
running = true;
log.info("Starting database change source for table: {}",
config.getDatabase().getTableName());
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::pollDatabaseChanges, 0,
config.getPollInterval().toSeconds(), TimeUnit.SECONDS);
}
private void pollDatabaseChanges() {
if (!running) return;
try {
List<Map<String, Object>> changes = fetchDatabaseChanges();
if (!changes.isEmpty()) {
log.info("Found {} database changes since {}",
changes.size(), lastPollTime);
sendDatabaseEvents(changes);
}
lastPollTime = Instant.now();
} catch (Exception e) {
log.error("Error polling database changes", e);
}
}
private List<Map<String, Object>> fetchDatabaseChanges() {
String query = config.getDatabase().getPollQuery();
if (query == null) {
query = "SELECT * FROM " + config.getDatabase().getTableName() +
" WHERE last_modified > ? ORDER BY last_modified ASC";
}
List<Map<String, Object>> results = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(query)) {
stmt.setTimestamp(1, Timestamp.from(lastPollTime));
try (ResultSet rs = stmt.executeQuery()) {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
Object value = rs.getObject(i);
row.put(columnName, value);
}
results.add(row);
}
}
} catch (SQLException e) {
throw new RuntimeException("Failed to fetch database changes", e);
}
return results;
}
private void sendDatabaseEvents(List<Map<String, Object>> changes) {
List<CloudEvent> events = changes.stream()
.map(change -> {
String operation = determineOperation(change);
return eventFactory.createDatabaseEvent(operation, change);
})
.collect(Collectors.toList());
eventSender.sendEvents(events)
.doOnError(error -> log.error("Failed to send database events", error))
.subscribe();
}
private String determineOperation(Map<String, Object> change) {
// Determine if this is an INSERT, UPDATE, or DELETE based on change data
if (change.containsKey("deleted") && Boolean.TRUE.equals(change.get("deleted"))) {
return "delete";
} else if (change.containsKey("created_at") &&
Instant.parse(change.get("created_at").toString())
.isAfter(lastPollTime.minus(Duration.ofSeconds(1)))) {
return "insert";
} else {
return "update";
}
}
public void stop() {
running = false;
log.info("Stopped database change source");
}
public boolean isRunning() {
return running;
}
}
6. REST API Polling Source
@Component
@Slf4j
public class RestApiPollingSource {
private final KnativeSourceConfig config;
private final EventSenderService eventSender;
private final CloudEventFactory eventFactory;
private final WebClient webClient;
private volatile boolean running = false;
private String lastEtag = "";
private Instant lastPollTime = Instant.now();
public RestApiPollingSource(KnativeSourceConfig config, EventSenderService eventSender,
CloudEventFactory eventFactory, WebClient.Builder webClientBuilder) {
this.config = config;
this.eventSender = eventSender;
this.eventFactory = eventFactory;
this.webClient = webClientBuilder
.baseUrl(config.getApi().getBaseUrl())
.defaultHeaders(headers -> {
headers.setBearerAuth(config.getApi().getAuthToken());
config.getApi().getHeaders().forEach(headers::set);
})
.build();
}
public void startPolling() {
if (running) {
log.warn("REST API polling source is already running");
return;
}
running = true;
log.info("Starting REST API polling source for: {}", config.getApi().getBaseUrl());
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::pollApi, 0,
config.getPollInterval().toSeconds(), TimeUnit.SECONDS);
}
private void pollApi() {
if (!running) return;
webClient.get()
.header("If-None-Match", lastEtag)
.retrieve()
.toEntity(String.class)
.doOnSuccess(response -> {
if (response.getStatusCode().is2xx()) {
handleApiResponse(response);
} else if (response.getStatusCode().value() == 304) {
log.debug("No changes in API response (304 Not Modified)");
}
})
.doOnError(error -> log.error("Error polling API", error))
.subscribe();
}
private void handleApiResponse(org.springframework.http.ResponseEntity<String> response) {
String currentEtag = response.getHeaders().getETag();
String body = response.getBody();
if (currentEtag != null && !currentEtag.equals(lastEtag)) {
lastEtag = currentEtag;
CloudEvent event = eventFactory.createApiEvent("poll", Map.of(
"data", parseResponseBody(body),
"etag", currentEtag,
"pollTime", Instant.now().toString()
));
eventSender.sendEvent(event)
.doOnError(error -> log.error("Failed to send API event", error))
.subscribe();
log.info("Sent API change event with ETag: {}", currentEtag);
}
lastPollTime = Instant.now();
}
private Object parseResponseBody(String body) {
try {
return new ObjectMapper().readValue(body, Object.class);
} catch (Exception e) {
return body; // Return as string if not JSON
}
}
public void stop() {
running = false;
log.info("Stopped REST API polling source");
}
}
7. File System Watcher Source
@Component
@Slf4j
public class FileSystemWatcherSource {
private final KnativeSourceConfig config;
private final EventSenderService eventSender;
private final CloudEventFactory eventFactory;
private WatchService watchService;
private volatile boolean running = false;
public FileSystemWatcherSource(KnativeSourceConfig config, EventSenderService eventSender,
CloudEventFactory eventFactory) {
this.config = config;
this.eventSender = eventSender;
this.eventFactory = eventFactory;
}
public void startWatching() {
if (running) {
log.warn("File system watcher is already running");
return;
}
try {
Path watchPath = Paths.get(config.getFile().getDirectory());
if (!Files.exists(watchPath)) {
Files.createDirectories(watchPath);
}
watchService = FileSystems.getDefault().newWatchService();
watchPath.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE);
running = true;
log.info("Started file system watcher for: {}", watchPath);
startWatchThread();
} catch (IOException e) {
throw new RuntimeException("Failed to start file system watcher", e);
}
}
private void startWatchThread() {
Thread watchThread = new Thread(() -> {
while (running) {
try {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
handleFileEvent(event);
}
boolean valid = key.reset();
if (!valid) {
log.warn("Watch key no longer valid");
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error in file watcher", e);
}
}
});
watchThread.setDaemon(true);
watchThread.start();
}
@SuppressWarnings("unchecked")
private void handleFileEvent(WatchEvent<?> event) {
WatchEvent.Kind<?> kind = event.kind();
Path filename = ((WatchEvent<Path>) event).context();
Path fullPath = Paths.get(config.getFile().getDirectory()).resolve(filename);
if (!matchesFilePattern(filename.toString())) {
return;
}
String action = getActionFromKind(kind);
CloudEvent fileEvent = eventFactory.createFileEvent(action, fullPath.toString());
eventSender.sendEvent(fileEvent)
.doOnSuccess(v -> log.debug("Sent file event: {} for {}", action, filename))
.doOnError(error -> log.error("Failed to send file event", error))
.subscribe();
}
private boolean matchesFilePattern(String filename) {
if (config.getFile().getPattern() == null) {
return true;
}
return filename.matches(config.getFile().getPattern());
}
private String getActionFromKind(WatchEvent.Kind<?> kind) {
if (kind == StandardWatchEventKinds.ENTRY_CREATE) return "create";
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) return "modify";
if (kind == StandardWatchEventKinds.ENTRY_DELETE) return "delete";
return "unknown";
}
public void stop() {
running = false;
if (watchService != null) {
try {
watchService.close();
} catch (IOException e) {
log.error("Error closing watch service", e);
}
}
log.info("Stopped file system watcher");
}
}
8. Main Application Controller
@SpringBootApplication
@EnableConfigurationProperties(KnativeSourceConfig.class)
@Slf4j
public class KnativeSourceApplication implements CommandLineRunner {
@Autowired
private KnativeSourceConfig config;
@Autowired(required = false)
private DatabaseChangeSource databaseSource;
@Autowired(required = false)
private RestApiPollingSource apiSource;
@Autowired(required = false)
private FileSystemWatcherSource fileSource;
@Autowired
private KubernetesClient kubernetesClient;
public static void main(String[] args) {
SpringApplication.run(KnativeSourceApplication.class, args);
}
@Override
public void run(String... args) {
log.info("Starting Knative Eventing Source: {}", config.getName());
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
// Start configured sources
startConfiguredSources();
// Create Kubernetes Service and Knative Source if needed
createKubernetesResources();
log.info("Knative Eventing Source started successfully");
}
private void startConfiguredSources() {
if (config.getDatabase() != null && databaseSource != null) {
databaseSource.startPolling();
}
if (config.getApi() != null && apiSource != null) {
apiSource.startPolling();
}
if (config.getFile() != null && fileSource != null) {
fileSource.startWatching();
}
}
private void shutdown() {
log.info("Shutting down Knative Eventing Source");
if (databaseSource != null) {
databaseSource.stop();
}
if (apiSource != null) {
apiSource.stop();
}
if (fileSource != null) {
fileSource.stop();
}
}
private void createKubernetesResources() {
try {
// Create Kubernetes Service for the source
Service service = new ServiceBuilder()
.withNewMetadata()
.withName(config.getName())
.withNamespace(config.getNamespace())
.withLabels(Map.of(
"app", config.getName(),
"knative-eventing-source", "true"
))
.endMetadata()
.withNewSpec()
.withSelector(Map.of("app", config.getName()))
.addNewPort()
.withName("http")
.withPort(8080)
.withProtocol("TCP")
.endPort()
.endSpec()
.build();
kubernetesClient.services().inNamespace(config.getNamespace())
.resource(service)
.createOrReplace();
log.info("Created Kubernetes Service for source: {}", config.getName());
} catch (Exception e) {
log.warn("Failed to create Kubernetes resources", e);
}
}
@Bean
@ConditionalOnProperty(prefix = "knative.source.database", name = "jdbc-url")
public DataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(config.getDatabase().getJdbcUrl());
dataSource.setUsername(config.getDatabase().getUsername());
dataSource.setPassword(config.getDatabase().getPassword());
dataSource.setMaximumPoolSize(5);
return dataSource;
}
@Bean
public KubernetesClient kubernetesClient() {
return new DefaultKubernetesClient();
}
}
9. Kubernetes Deployment Manifest
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: custom-database-source
namespace: event-sources
labels:
app: custom-database-source
source-type: database
spec:
replicas: 1
selector:
matchLabels:
app: custom-database-source
template:
metadata:
labels:
app: custom-database-source
spec:
serviceAccountName: knative-source-sa
containers:
- name: source
image: my-registry/custom-database-source:1.0.0
ports:
- containerPort: 8080
env:
- name: KNA