Knative enables serverless workloads on Kubernetes, providing auto-scaling, scale-to-zero, and event-driven capabilities. This guide covers building FaaS applications with Knative in Java.
Setup and Dependencies
1. Maven Configuration
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.company.faas</groupId>
<artifactId>knative-java-function</artifactId>
<version>1.0.0</version>
<properties>
<quarkus.version>3.2.0.Final</quarkus.version>
<knative.version>1.11.0</knative.version>
<funqy.version>1.0.0.Final</funqy.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${quarkus.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Knative Serving -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
<!-- Knative Eventing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<!-- Funqy HTTP (Function as a Service) -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-http</artifactId>
</dependency>
<!-- Funqy Knative Events -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>
<!-- RESTEasy Reactive for HTTP functions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<!-- Health and Metrics -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Config -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-config-yaml</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-knative-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. Application Configuration
# src/main/resources/application.yml
quarkus:
application:
name: user-registration-function
version: 1.0.0
# Knative Configuration
kubernetes:
deploy: true
generate: true
namespace: knative-serving
knative:
# Knative Service configuration
service:
auto-scaling:
class: kpa.autoscaling.knative.dev
metric: concurrency
target: 100
revision-auto-scaling:
container-concurrency: 10
max-scale: 20
min-scale: 0 # Scale to zero
window: 60s
labels:
app.kubernetes.io/part-of: user-management
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "20"
autoscaling.knative.dev/target: "100"
# Container Configuration
container-image:
builder: jib
group: quay.io/company
name: ${quarkus.application.name}
tag: ${quarkus.application.version}
# Funqy Configuration
funqy:
http:
export: true
path: /functions
knative-events:
mapping:
# CloudEvent mappings
"user.registered": "handleUserRegistered"
"user.updated": "handleUserUpdated"
"user.deleted": "handleUserDeleted"
# HTTP Server
http:
port: 8080
host: 0.0.0.0
cors: true
# Health Checks
smallrye-health:
root-path: /health
# Logging
log:
level: INFO
category:
"com.company.faas": DEBUG
# Knative Client
kubernetes-client:
namespace: knative-serving
trust-certs: true
Core Function Implementations
1. HTTP Functions (Funqy)
package com.company.faas.functions;
import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEvent;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* User Registration Function - HTTP triggered
*/
@ApplicationScoped
public class UserRegistrationFunction {
private static final Logger LOG = Logger.getLogger(UserRegistrationFunction.class);
private final Map<String, User> userStore = new ConcurrentHashMap<>();
private final AtomicLong userCounter = new AtomicLong(1000);
@Funq
public UserResponse registerUser(UserRegistration request) {
LOG.infof("Registering user: %s", request.getEmail());
// Validate input
if (request.getEmail() == null || request.getEmail().trim().isEmpty()) {
throw new FunctionException("Email is required");
}
// Check if user already exists
if (userStore.values().stream()
.anyMatch(user -> user.getEmail().equalsIgnoreCase(request.getEmail()))) {
throw new FunctionException("User with this email already exists");
}
// Create user
String userId = "user-" + userCounter.incrementAndGet();
User user = new User(
userId,
request.getEmail(),
request.getFirstName(),
request.getLastName(),
System.currentTimeMillis()
);
userStore.put(userId, user);
LOG.infof("User registered successfully: %s", userId);
return new UserResponse(
userId,
"User registered successfully",
user
);
}
@Funq
public UserResponse getUser(String userId) {
LOG.infof("Fetching user: %s", userId);
User user = userStore.get(userId);
if (user == null) {
throw new FunctionException("User not found: " + userId);
}
return new UserResponse(
userId,
"User retrieved successfully",
user
);
}
@Funq
public UserResponse updateUser(UserUpdate request) {
LOG.infof("Updating user: %s", request.getUserId());
User existingUser = userStore.get(request.getUserId());
if (existingUser == null) {
throw new FunctionException("User not found: " + request.getUserId());
}
// Update user fields
User updatedUser = new User(
existingUser.getId(),
existingUser.getEmail(), // Email cannot be changed
request.getFirstName() != null ? request.getFirstName() : existingUser.getFirstName(),
request.getLastName() != null ? request.getLastName() : existingUser.getLastName(),
existingUser.getCreatedAt()
);
userStore.put(request.getUserId(), updatedUser);
LOG.infof("User updated successfully: %s", request.getUserId());
return new UserResponse(
request.getUserId(),
"User updated successfully",
updatedUser
);
}
@Funq
public DeleteResponse deleteUser(String userId) {
LOG.infof("Deleting user: %s", userId);
User removedUser = userStore.remove(userId);
if (removedUser == null) {
throw new FunctionException("User not found: " + userId);
}
LOG.infof("User deleted successfully: %s", userId);
return new DeleteResponse(
userId,
"User deleted successfully",
System.currentTimeMillis()
);
}
@Funq
public UserListResponse listUsers() {
LOG.info("Listing all users");
return new UserListResponse(
"Users retrieved successfully",
userStore.size(),
userStore.values().stream().toList()
);
}
// Data Models
public static class UserRegistration {
private String email;
private String firstName;
private String lastName;
// Getters and setters
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
}
public static class UserUpdate {
private String userId;
private String firstName;
private String lastName;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
}
public static class User {
private final String id;
private final String email;
private final String firstName;
private final String lastName;
private final long createdAt;
public User(String id, String email, String firstName, String lastName, long createdAt) {
this.id = id;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.createdAt = createdAt;
}
// Getters
public String getId() { return id; }
public String getEmail() { return email; }
public String getFirstName() { return firstName; }
public String getLastName() { return lastName; }
public long getCreatedAt() { return createdAt; }
}
public static class UserResponse {
private final String userId;
private final String message;
private final User user;
public UserResponse(String userId, String message, User user) {
this.userId = userId;
this.message = message;
this.user = user;
}
// Getters
public String getUserId() { return userId; }
public String getMessage() { return message; }
public User getUser() { return user; }
}
public static class UserListResponse {
private final String message;
private final int total;
private final java.util.List<User> users;
public UserListResponse(String message, int total, java.util.List<User> users) {
this.message = message;
this.total = total;
this.users = users;
}
// Getters
public String getMessage() { return message; }
public int getTotal() { return total; }
public java.util.List<User> getUsers() { return users; }
}
public static class DeleteResponse {
private final String userId;
private final String message;
private final long deletedAt;
public DeleteResponse(String userId, String message, long deletedAt) {
this.userId = userId;
this.message = message;
this.deletedAt = deletedAt;
}
// Getters
public String getUserId() { return userId; }
public String getMessage() { return message; }
public long getDeletedAt() { return deletedAt; }
}
public static class FunctionException extends RuntimeException {
public FunctionException(String message) {
super(message);
}
}
}
2. Event-Driven Functions (CloudEvents)
package com.company.faas.functions;
import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEvent;
import io.quarkus.funqy.knative.events.CloudEventMapping;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Event-driven functions for user management
*/
@ApplicationScoped
public class UserEventFunctions {
private static final Logger LOG = Logger.getLogger(UserEventFunctions.class);
private final Map<String, UserEvent> eventStore = new ConcurrentHashMap<>();
@Funq
@CloudEventMapping(trigger = "user.registered")
public void handleUserRegistered(CloudEvent<UserRegisteredEvent> event) {
UserRegisteredEvent data = event.data();
LOG.infof("Processing user registration event: %s", data.getUserId());
// Store event
eventStore.put(event.id(), new UserEvent(
event.id(),
"user.registered",
data.getUserId(),
System.currentTimeMillis(),
data
));
// Process the registration event
processUserRegistration(data);
LOG.infof("User registration event processed: %s", event.id());
}
@Funq
@CloudEventMapping(trigger = "user.updated")
public void handleUserUpdated(CloudEvent<UserUpdatedEvent> event) {
UserUpdatedEvent data = event.data();
LOG.infof("Processing user update event: %s", data.getUserId());
// Store event
eventStore.put(event.id(), new UserEvent(
event.id(),
"user.updated",
data.getUserId(),
System.currentTimeMillis(),
data
));
// Process the update event
processUserUpdate(data);
LOG.infof("User update event processed: %s", event.id());
}
@Funq
@CloudEventMapping(trigger = "user.deleted")
public void handleUserDeleted(CloudEvent<UserDeletedEvent> event) {
UserDeletedEvent data = event.data();
LOG.infof("Processing user deletion event: %s", data.getUserId());
// Store event
eventStore.put(event.id(), new UserEvent(
event.id(),
"user.deleted",
data.getUserId(),
System.currentTimeMillis(),
data
));
// Process the deletion event
processUserDeletion(data);
LOG.infof("User deletion event processed: %s", event.id());
}
@Funq
public EventHistoryResponse getEventHistory(String userId) {
LOG.infof("Fetching event history for user: %s", userId);
var userEvents = eventStore.values().stream()
.filter(event -> userId.equals(event.getUserId()))
.sorted((e1, e2) -> Long.compare(e2.getTimestamp(), e1.getTimestamp()))
.toList();
return new EventHistoryResponse(
userId,
"Event history retrieved successfully",
userEvents.size(),
userEvents
);
}
private void processUserRegistration(UserRegisteredEvent event) {
// Send welcome email
sendWelcomeEmail(event.getEmail(), event.getFirstName());
// Initialize user profile
initializeUserProfile(event.getUserId());
// Add to analytics
trackUserRegistration(event);
}
private void processUserUpdate(UserUpdatedEvent event) {
// Update user profile in external systems
updateExternalProfile(event.getUserId());
// Log profile change
logProfileChange(event);
}
private void processUserDeletion(UserDeletedEvent event) {
// Remove from external systems
removeFromExternalSystems(event.getUserId());
// Archive user data
archiveUserData(event.getUserId());
// Log deletion
logUserDeletion(event);
}
private void sendWelcomeEmail(String email, String firstName) {
LOG.infof("Sending welcome email to: %s (%s)", email, firstName);
// Implement email sending logic
}
private void initializeUserProfile(String userId) {
LOG.infof("Initializing profile for user: %s", userId);
// Implement profile initialization
}
private void trackUserRegistration(UserRegisteredEvent event) {
LOG.infof("Tracking registration for user: %s", event.getUserId());
// Implement analytics tracking
}
private void updateExternalProfile(String userId) {
LOG.infof("Updating external profile for user: %s", userId);
// Implement external system update
}
private void logProfileChange(UserUpdatedEvent event) {
LOG.infof("Logging profile change for user: %s", event.getUserId());
// Implement audit logging
}
private void removeFromExternalSystems(String userId) {
LOG.infof("Removing user from external systems: %s", userId);
// Implement external system cleanup
}
private void archiveUserData(String userId) {
LOG.infof("Archiving data for user: %s", userId);
// Implement data archiving
}
private void logUserDeletion(UserDeletedEvent event) {
LOG.infof("Logging user deletion: %s", event.getUserId());
// Implement audit logging
}
// Event Data Models
public static class UserRegisteredEvent {
private String userId;
private String email;
private String firstName;
private String lastName;
private long registeredAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public long getRegisteredAt() { return registeredAt; }
public void setRegisteredAt(long registeredAt) { this.registeredAt = registeredAt; }
}
public static class UserUpdatedEvent {
private String userId;
private String firstName;
private String lastName;
private long updatedAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public long getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(long updatedAt) { this.updatedAt = updatedAt; }
}
public static class UserDeletedEvent {
private String userId;
private String reason;
private long deletedAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getReason() { return reason; }
public void setReason(String reason) { this.reason = reason; }
public long getDeletedAt() { return deletedAt; }
public void setDeletedAt(long deletedAt) { this.deletedAt = deletedAt; }
}
public static class UserEvent {
private final String eventId;
private final String type;
private final String userId;
private final long timestamp;
private final Object data;
public UserEvent(String eventId, String type, String userId, long timestamp, Object data) {
this.eventId = eventId;
this.type = type;
this.userId = userId;
this.timestamp = timestamp;
this.data = data;
}
// Getters
public String getEventId() { return eventId; }
public String getType() { return type; }
public String getUserId() { return userId; }
public long getTimestamp() { return timestamp; }
public Object getData() { return data; }
}
public static class EventHistoryResponse {
private final String userId;
private final String message;
private final int totalEvents;
private final java.util.List<UserEvent> events;
public EventHistoryResponse(String userId, String message, int totalEvents, java.util.List<UserEvent> events) {
this.userId = userId;
this.message = message;
this.totalEvents = totalEvents;
this.events = events;
}
// Getters
public String getUserId() { return userId; }
public String getMessage() { return message; }
public int getTotalEvents() { return totalEvents; }
public java.util.List<UserEvent> getEvents() { return events; }
}
}
3. REST API Functions
package com.company.faas.functions;
import io.quarkus.funqy.Funq;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* REST-style functions for API endpoints
*/
@ApplicationScoped
@Path("/api")
public class ApiFunctions {
private static final Logger LOG = Logger.getLogger(ApiFunctions.class);
private final Map<String, ApiKey> apiKeys = new ConcurrentHashMap<>();
private final AtomicInteger requestCounter = new AtomicInteger(0);
@Funq
@POST
@Path("/apikeys")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public ApiKeyResponse createApiKey(CreateApiKeyRequest request) {
LOG.infof("Creating API key for: %s", request.getName());
String apiKey = generateApiKey();
ApiKey key = new ApiKey(
apiKey,
request.getName(),
request.getDescription(),
System.currentTimeMillis(),
System.currentTimeMillis() + (request.getTtlDays() * 24 * 60 * 60 * 1000)
);
apiKeys.put(apiKey, key);
LOG.infof("API key created: %s", apiKey);
return new ApiKeyResponse(
"API key created successfully",
key
);
}
@Funq
@GET
@Path("/apikeys")
@Produces(MediaType.APPLICATION_JSON)
public ApiKeyListResponse listApiKeys() {
LOG.info("Listing API keys");
return new ApiKeyListResponse(
"API keys retrieved successfully",
apiKeys.size(),
apiKeys.values().stream().toList()
);
}
@Funq
@DELETE
@Path("/apikeys/{key}")
@Produces(MediaType.APPLICATION_JSON)
public ApiKeyResponse revokeApiKey(@PathParam("key") String apiKey) {
LOG.infof("Revoking API key: %s", apiKey);
ApiKey removed = apiKeys.remove(apiKey);
if (removed == null) {
throw new FunctionException("API key not found: " + apiKey);
}
LOG.infof("API key revoked: %s", apiKey);
return new ApiKeyResponse(
"API key revoked successfully",
removed
);
}
@Funq
@GET
@Path("/stats")
@Produces(MediaType.APPLICATION_JSON)
public StatsResponse getStats() {
int totalRequests = requestCounter.incrementAndGet();
long activeApiKeys = apiKeys.values().stream()
.filter(key -> key.getExpiresAt() > System.currentTimeMillis())
.count();
return new StatsResponse(
"Statistics retrieved successfully",
totalRequests,
activeApiKeys,
System.currentTimeMillis()
);
}
@Funq
@POST
@Path("/validate")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public ValidationResponse validateApiKey(ValidateRequest request) {
LOG.infof("Validating API key");
ApiKey apiKey = apiKeys.get(request.getApiKey());
boolean isValid = apiKey != null && apiKey.getExpiresAt() > System.currentTimeMillis();
return new ValidationResponse(
isValid ? "API key is valid" : "API key is invalid",
isValid,
apiKey
);
}
private String generateApiKey() {
return "key_" + System.currentTimeMillis() + "_" +
java.util.UUID.randomUUID().toString().substring(0, 8);
}
// Data Models
public static class CreateApiKeyRequest {
private String name;
private String description;
private int ttlDays = 30;
// Getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public int getTtlDays() { return ttlDays; }
public void setTtlDays(int ttlDays) { this.ttlDays = ttlDays; }
}
public static class ApiKey {
private final String key;
private final String name;
private final String description;
private final long createdAt;
private final long expiresAt;
public ApiKey(String key, String name, String description, long createdAt, long expiresAt) {
this.key = key;
this.name = name;
this.description = description;
this.createdAt = createdAt;
this.expiresAt = expiresAt;
}
// Getters
public String getKey() { return key; }
public String getName() { return name; }
public String getDescription() { return description; }
public long getCreatedAt() { return createdAt; }
public long getExpiresAt() { return expiresAt; }
}
public static class ApiKeyResponse {
private final String message;
private final ApiKey apiKey;
public ApiKeyResponse(String message, ApiKey apiKey) {
this.message = message;
this.apiKey = apiKey;
}
// Getters
public String getMessage() { return message; }
public ApiKey getApiKey() { return apiKey; }
}
public static class ApiKeyListResponse {
private final String message;
private final int total;
private final java.util.List<ApiKey> apiKeys;
public ApiKeyListResponse(String message, int total, java.util.List<ApiKey> apiKeys) {
this.message = message;
this.total = total;
this.apiKeys = apiKeys;
}
// Getters
public String getMessage() { return message; }
public int getTotal() { return total; }
public java.util.List<ApiKey> getApiKeys() { return apiKeys; }
}
public static class StatsResponse {
private final String message;
private final int totalRequests;
private final long activeApiKeys;
private final long timestamp;
public StatsResponse(String message, int totalRequests, long activeApiKeys, long timestamp) {
this.message = message;
this.totalRequests = totalRequests;
this.activeApiKeys = activeApiKeys;
this.timestamp = timestamp;
}
// Getters
public String getMessage() { return message; }
public int getTotalRequests() { return totalRequests; }
public long getActiveApiKeys() { return activeApiKeys; }
public long getTimestamp() { return timestamp; }
}
public static class ValidateRequest {
private String apiKey;
// Getters and setters
public String getApiKey() { return apiKey; }
public void setApiKey(String apiKey) { this.apiKey = apiKey; }
}
public static class ValidationResponse {
private final String message;
private final boolean valid;
private final ApiKey apiKey;
public ValidationResponse(String message, boolean valid, ApiKey apiKey) {
this.message = message;
this.valid = valid;
this.apiKey = apiKey;
}
// Getters
public String getMessage() { return message; }
public boolean isValid() { return valid; }
public ApiKey getApiKey() { return apiKey; }
}
public static class FunctionException extends RuntimeException {
public FunctionException(String message) {
super(message);
}
}
}
Knative Configuration and Deployment
1. Knative Service Configuration
# src/main/kubernetes/knative.yml apiVersion: serving.knative.dev/v1 kind: Service metadata: name: user-registration-function namespace: knative-serving labels: app.kubernetes.io/name: user-registration-function app.kubernetes.io/version: "1.0.0" app.kubernetes.io/part-of: user-management annotations: autoscaling.knative.dev/minScale: "0" autoscaling.knative.dev/maxScale: "20" autoscaling.knative.dev/target: "100" autoscaling.knative.dev/window: "60s" spec: template: metadata: labels: app.kubernetes.io/name: user-registration-function app.kubernetes.io/version: "1.0.0" annotations: autoscaling.knative.dev/minScale: "0" autoscaling.knative.dev/maxScale: "20" autoscaling.knative.dev/target: "100" spec: containerConcurrency: 10 containers: - image: quay.io/company/user-registration-function:1.0.0 imagePullPolicy: IfNotPresent name: user-registration-function ports: - containerPort: 8080 name: http protocol: TCP env: - name: JAVA_OPTS value: "-Xmx256m -Xms128m -XX:MaxRAM=256m" - name: QUARKUS_HTTP_PORT value: "8080" - name: QUARKUS_LOG_LEVEL value: "INFO" resources: requests: memory: "256Mi" cpu: "100m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health/live port: 8080 initialDelaySeconds: 10 periodSeconds: 5 timeoutSeconds: 1 successThreshold: 1 failureThreshold: 3 readinessProbe: httpGet: path: /health/ready port: 8080 initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 1 successThreshold: 1 failureThreshold: 3 startupProbe: httpGet: path: /health/started port: 8080 initialDelaySeconds: 0 periodSeconds: 5 timeoutSeconds: 1 successThreshold: 1 failureThreshold: 30 --- # Knative Trigger for CloudEvents apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: user-registered-trigger namespace: knative-serving spec: broker: default filter: attributes: type: user.registered subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: user-registration-function --- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: user-updated-trigger namespace: knative-serving spec: broker: default filter: attributes: type: user.updated subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: user-registration-function --- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: user-deleted-trigger namespace: knative-serving spec: broker: default filter: attributes: type: user.deleted subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: user-registration-function
2. Knative Eventing Configuration
package com.company.faas.events;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* Event producer and consumer for Knative Eventing
*/
@ApplicationScoped
public class UserEventProducer {
private static final Logger LOG = Logger.getLogger(UserEventProducer.class);
@ConsumeEvent("user-registered")
public CompletionStage<String> handleUserRegistered(UserRegisteredEvent event) {
LOG.infof("Producing user registered event: %s", event.getUserId());
// Process the event asynchronously
return CompletableFuture.supplyAsync(() -> {
// Additional processing logic
LOG.infof("User registration event processed: %s", event.getUserId());
return "Event processed: " + event.getUserId();
});
}
@Incoming("user-events")
public void processUserEvent(Record<String, String> record) {
String key = record.key();
String value = record.value();
LOG.infof("Processing user event - Key: %s, Value: %s", key, value);
// Process the event from Kafka (or other message broker)
processEventFromBroker(key, value);
}
@Outgoing("user-events-out")
public String produceUserEvent(String eventData) {
LOG.infof("Producing user event: %s", eventData);
return eventData;
}
private void processEventFromBroker(String key, String value) {
// Implement event processing logic
LOG.infof("Processing event from broker - Key: %s", key);
}
// Event publishing methods
public void publishUserRegistered(String userId, String email, String firstName, String lastName) {
UserRegisteredEvent event = new UserRegisteredEvent();
event.setUserId(userId);
event.setEmail(email);
event.setFirstName(firstName);
event.setLastName(lastName);
event.setRegisteredAt(System.currentTimeMillis());
LOG.infof("Publishing user registered event: %s", userId);
// In practice, you'd use a CloudEvent client to publish to the broker
}
public void publishUserUpdated(String userId, String firstName, String lastName) {
UserUpdatedEvent event = new UserUpdatedEvent();
event.setUserId(userId);
event.setFirstName(firstName);
event.setLastName(lastName);
event.setUpdatedAt(System.currentTimeMillis());
LOG.infof("Publishing user updated event: %s", userId);
}
public void publishUserDeleted(String userId, String reason) {
UserDeletedEvent event = new UserDeletedEvent();
event.setUserId(userId);
event.setReason(reason);
event.setDeletedAt(System.currentTimeMillis());
LOG.infof("Publishing user deleted event: %s", userId);
}
// Event data classes
public static class UserRegisteredEvent {
private String userId;
private String email;
private String firstName;
private String lastName;
private long registeredAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public long getRegisteredAt() { return registeredAt; }
public void setRegisteredAt(long registeredAt) { this.registeredAt = registeredAt; }
}
public static class UserUpdatedEvent {
private String userId;
private String firstName;
private String lastName;
private long updatedAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public long getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(long updatedAt) { this.updatedAt = updatedAt; }
}
public static class UserDeletedEvent {
private String userId;
private String reason;
private long deletedAt;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getReason() { return reason; }
public void setReason(String reason) { this.reason = reason; }
public long getDeletedAt() { return deletedAt; }
public void setDeletedAt(long deletedAt) { this.deletedAt = deletedAt; }
}
}
Function Monitoring and Observability
1. Health Checks
package com.company.faas.health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Liveness;
import org.eclipse.microprofile.health.Readiness;
import org.eclipse.microprofile.health.Startup;
import javax.enterprise.context.ApplicationScoped;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
/**
* Health checks for Knative function
*/
@ApplicationScoped
public class FunctionHealthChecks {
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
@Liveness
HealthCheck liveness() {
return () -> HealthCheckResponse.named("user-function-liveness")
.status(isFunctionAlive())
.withData("timestamp", System.currentTimeMillis())
.withData("memory_used", memoryMXBean.getHeapMemoryUsage().getUsed())
.withData("memory_max", memoryMXBean.getHeapMemoryUsage().getMax())
.build();
}
@Readiness
HealthCheck readiness() {
return () -> HealthCheckResponse.named("user-function-readiness")
.status(isFunctionReady())
.withData("timestamp", System.currentTimeMillis())
.withData("uptime", ManagementFactory.getRuntimeMXBean().getUptime())
.build();
}
@Startup
HealthCheck startup() {
return () -> HealthCheckResponse.named("user-function-startup")
.status(isStartupComplete())
.withData("timestamp", System.currentTimeMillis())
.build();
}
private boolean isFunctionAlive() {
// Basic liveness check
return memoryMXBean.getHeapMemoryUsage().getUsed() <
memoryMXBean.getHeapMemoryUsage().getMax() * 0.9;
}
private boolean isFunctionReady() {
// Check if function can process requests
return true;
}
private boolean isStartupComplete() {
// Check if function initialization is complete
return ManagementFactory.getRuntimeMXBean().getUptime() > 5000; // 5 seconds
}
}
2. Metrics and Monitoring
package com.company.faas.metrics;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.eclipse.microprofile.metrics.MetricUnits;
import org.eclipse.microprofile.metrics.annotation.Gauge;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Metrics collection for function monitoring
*/
@ApplicationScoped
public class FunctionMetrics {
@Inject
MeterRegistry meterRegistry;
private final Counter totalRequests;
private final Counter successfulRequests;
private final Counter failedRequests;
private final Timer requestDuration;
private final AtomicLong activeRequests;
private final ConcurrentMap<String, Counter> functionCounters;
public FunctionMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.totalRequests = Counter.builder("function_requests_total")
.description("Total number of function requests")
.register(meterRegistry);
this.successfulRequests = Counter.builder("function_requests_successful")
.description("Number of successful function requests")
.register(meterRegistry);
this.failedRequests = Counter.builder("function_requests_failed")
.description("Number of failed function requests")
.register(meterRegistry);
this.requestDuration = Timer.builder("function_request_duration_seconds")
.description("Function request duration in seconds")
.register(meterRegistry);
this.activeRequests = new AtomicLong(0);
this.functionCounters = new ConcurrentHashMap<>();
}
public void recordFunctionCall(String functionName, long durationMs, boolean success) {
totalRequests.increment();
if (success) {
successfulRequests.increment();
} else {
failedRequests.increment();
}
requestDuration.record(durationMs, TimeUnit.MILLISECONDS);
// Record per-function metrics
String counterName = "function_calls_total";
functionCounters.computeIfAbsent(functionName,
name -> Counter.builder(counterName)
.tag("function", name)
.register(meterRegistry)
).increment();
}
public void recordEventProcessed(String eventType, long durationMs) {
String timerName = "event_processing_duration_seconds";
Timer.builder(timerName)
.tag("event_type", eventType)
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
Counter.builder("events_processed_total")
.tag("event_type", eventType)
.register(meterRegistry)
.increment();
}
public void incrementActiveRequests() {
activeRequests.incrementAndGet();
}
public void decrementActiveRequests() {
activeRequests.decrementAndGet();
}
@Gauge(name = "function_active_requests", unit = MetricUnits.NONE)
public long getActiveRequests() {
return activeRequests.get();
}
@Gauge(name = "function_memory_usage", unit = MetricUnits.BYTES)
public long getMemoryUsage() {
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}
@Gauge(name = "function_heap_usage", unit = MetricUnits.BYTES)
public long getHeapUsage() {
return Runtime.getRuntime().totalMemory();
}
}
Testing
1. Function Unit Tests
package com.company.faas.test;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.junit.jupiter.api.Test;
import static org.hamcrest.Matchers.*;
@QuarkusTest
class UserRegistrationFunctionTest {
@Test
void testRegisterUser() {
String requestBody = """
{
"email": "[email protected]",
"firstName": "John",
"lastName": "Doe"
}
""";
RestAssured.given()
.contentType(ContentType.JSON)
.body(requestBody)
.when()
.post("/functions/registerUser")
.then()
.statusCode(200)
.body("message", equalTo("User registered successfully"))
.body("user.email", equalTo("[email protected]"))
.body("user.firstName", equalTo("John"))
.body("user.lastName", equalTo("Doe"));
}
@Test
void testRegisterUserWithInvalidEmail() {
String requestBody = """
{
"email": "",
"firstName": "John",
"lastName": "Doe"
}
""";
RestAssured.given()
.contentType(ContentType.JSON)
.body(requestBody)
.when()
.post("/functions/registerUser")
.then()
.statusCode(500); // Function throws exception
}
@Test
void testListUsers() {
RestAssured.given()
.when()
.get("/functions/listUsers")
.then()
.statusCode(200)
.body("message", equalTo("Users retrieved successfully"))
.body("total", greaterThanOrEqualTo(0));
}
}
@QuarkusTest
class ApiFunctionsTest {
@Test
void testCreateApiKey() {
String requestBody = """
{
"name": "Test API Key",
"description": "For testing purposes",
"ttlDays": 7
}
""";
RestAssured.given()
.contentType(ContentType.JSON)
.body(requestBody)
.when()
.post("/api/apikeys")
.then()
.statusCode(200)
.body("message", equalTo("API key created successfully"))
.body("apiKey.name", equalTo("Test API Key"));
}
@Test
void testGetStats() {
RestAssured.given()
.when()
.get("/api/stats")
.then()
.statusCode(200)
.body("message", equalTo("Statistics retrieved successfully"))
.body("totalRequests", greaterThanOrEqualTo(0));
}
}
2. Knative Integration Tests
package com.company.faas.test;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.knative.client.KnativeClient;
import io.fabric8.knative.client.DefaultKnativeClient;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import static org.junit.jupiter.api.Assertions.*;
@QuarkusTest
class KnativeIntegrationTest {
@Inject
@KnativeClient
DefaultKnativeClient knativeClient;
@Test
void testKnativeServiceDeployment() {
// Verify Knative service is deployed
var service = knativeClient.services()
.inNamespace("knative-serving")
.withName("user-registration-function")
.get();
assertNotNull(service, "Knative service should be deployed");
assertEquals("user-registration-function", service.getMetadata().getName());
}
@Test
void testKnativeTriggers() {
// Verify triggers are configured
var triggers = knativeClient.triggers()
.inNamespace("knative-serving")
.list()
.getItems();
assertFalse(triggers.isEmpty(), "Should have Knative triggers configured");
boolean hasUserRegisteredTrigger = triggers.stream()
.anyMatch(trigger -> "user-registered-trigger".equals(trigger.getMetadata().getName()));
assertTrue(hasUserRegisteredTrigger, "Should have user registered trigger");
}
}
Deployment and Management
1. Build and Deploy Script
#!/bin/bash
# deploy-function.sh
set -e
# Configuration
APP_NAME="user-registration-function"
VERSION="1.0.0"
REGISTRY="quay.io/company"
NAMESPACE="knative-serving"
echo "Building Quarkus function..."
./mvnw clean package -Dquarkus.container-image.build=true -Dquarkus.kubernetes.deploy=true
echo "Pushing container image..."
docker push ${REGISTRY}/${APP_NAME}:${VERSION}
echo "Deploying to Knative..."
kubectl apply -f target/kubernetes/knative.yml -n ${NAMESPACE}
echo "Waiting for deployment to be ready..."
kubectl wait --for=condition=ready ksvc/${APP_NAME} --timeout=300s -n ${NAMESPACE}
echo "Function deployed successfully!"
echo "URL: $(kubectl get ksvc ${APP_NAME} -n ${NAMESPACE} -o jsonpath='{.status.url}')"
2. Function Invocation Examples
package com.company.faas.client;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
/**
* REST client for invoking the Knative function
*/
@RegisterRestClient(baseUri = "http://user-registration-function.knative-serving.svc.cluster.local")
@Path("/functions")
public interface FunctionClient {
@POST
@Path("/registerUser")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
UserResponse registerUser(UserRegistration request);
@GET
@Path("/getUser")
@Produces(MediaType.APPLICATION_JSON)
UserResponse getUser(@QueryParam("userId") String userId);
@POST
@Path("/updateUser")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
UserResponse updateUser(UserUpdate request);
@DELETE
@Path("/deleteUser")
@Produces(MediaType.APPLICATION_JSON)
DeleteResponse deleteUser(@QueryParam("userId") String userId);
@GET
@Path("/listUsers")
@Produces(MediaType.APPLICATION_JSON)
UserListResponse listUsers();
// Data classes
class UserRegistration {
public String email;
public String firstName;
public String lastName;
}
class UserUpdate {
public String userId;
public String firstName;
public String lastName;
}
class UserResponse {
public String userId;
public String message;
public User user;
}
class UserListResponse {
public String message;
public int total;
public java.util.List<User> users;
}
class DeleteResponse {
public String userId;
public String message;
public long deletedAt;
}
class User {
public String id;
public String email;
public String firstName;
public String lastName;
public long createdAt;
}
}
Summary
This Knative FaaS implementation provides:
- Serverless Functions: HTTP-triggered and event-driven functions
- Auto-scaling: Scale from zero to multiple instances based on demand
- Event-Driven Architecture: CloudEvents integration for loose coupling
- Kubernetes Native: Built on Kubernetes standards
- Fast Startup: Quarkus native compilation for sub-second cold starts
- Observability: Built-in health checks, metrics, and logging
Key Benefits:
- Cost Efficiency: Scale to zero when not in use
- Developer Productivity: Focus on business logic, not infrastructure
- Resilience: Built-in retries and error handling
- Portability: Run on any Kubernetes cluster with Knative
- Performance: Optimized for serverless workloads
This architecture enables building modern, cloud-native applications that are both cost-effective and highly scalable.