Overview
KEDA (Kubernetes-based Event Driven Autoscaling) allows you to automatically scale your Java applications based on events from various sources like Kafka, RabbitMQ, Redis, HTTP, and more. This enables true event-driven autoscaling for your microservices.
Core KEDA Concepts
1. KEDA ScaledObject Configuration Generator
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.kubernetes.client.openapi.models.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
public class KedaConfigGenerator {
private static final Logger logger = LoggerFactory.getLogger(KedaConfigGenerator.class);
private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
public String generateScaledObject(ScaledObjectConfig config) {
try {
Map<String, Object> scaledObject = createScaledObject(config);
return yamlMapper.writeValueAsString(scaledObject);
} catch (Exception e) {
throw new RuntimeException("Failed to generate ScaledObject YAML", e);
}
}
private Map<String, Object> createScaledObject(ScaledObjectConfig config) {
Map<String, Object> scaledObject = new LinkedHashMap<>();
// API Version
scaledObject.put("apiVersion", "keda.sh/v1alpha1");
scaledObject.put("kind", "ScaledObject");
// Metadata
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("name", config.getName());
metadata.put("namespace", config.getNamespace());
if (config.getLabels() != null) {
metadata.put("labels", config.getLabels());
}
scaledObject.put("metadata", metadata);
// Spec
Map<String, Object> spec = new LinkedHashMap<>();
// Scale Target
Map<String, Object> scaleTargetRef = new LinkedHashMap<>();
scaleTargetRef.put("apiVersion", config.getTargetApiVersion());
scaleTargetRef.put("kind", config.getTargetKind());
scaleTargetRef.put("name", config.getTargetName());
spec.put("scaleTargetRef", scaleTargetRef);
// Polling Interval
if (config.getPollingInterval() != null) {
spec.put("pollingInterval", config.getPollingInterval());
}
// Cooldown Period
if (config.getCooldownPeriod() != null) {
spec.put("cooldownPeriod", config.getCooldownPeriod());
}
// Min/Max Replica Count
spec.put("minReplicaCount", config.getMinReplicas());
if (config.getMaxReplicas() != null) {
spec.put("maxReplicaCount", config.getMaxReplicas());
}
// Advanced Config
if (config.getAdvanced() != null) {
spec.put("advanced", config.getAdvanced());
}
// Triggers
spec.put("triggers", config.getTriggers().stream()
.map(this::createTrigger)
.collect(Collectors.toList()));
scaledObject.put("spec", spec);
return scaledObject;
}
private Map<String, Object> createTrigger(TriggerConfig trigger) {
Map<String, Object> triggerMap = new LinkedHashMap<>();
triggerMap.put("type", trigger.getType());
if (trigger.getMetadata() != null && !trigger.getMetadata().isEmpty()) {
triggerMap.put("metadata", trigger.getMetadata());
}
if (trigger.getAuthenticationRef() != null) {
Map<String, Object> authRef = new LinkedHashMap<>();
authRef.put("name", trigger.getAuthenticationRef());
triggerMap.put("authenticationRef", authRef);
}
return triggerMap;
}
public static class ScaledObjectConfig {
private String name;
private String namespace = "default";
private Map<String, String> labels;
private String targetApiVersion = "apps/v1";
private String targetKind = "Deployment";
private String targetName;
private Integer pollingInterval = 30;
private Integer cooldownPeriod = 300;
private Integer minReplicas = 0;
private Integer maxReplicas = 100;
private Map<String, Object> advanced;
private List<TriggerConfig> triggers = new ArrayList<>();
// Builder methods
public ScaledObjectConfig withName(String name) {
this.name = name;
return this;
}
public ScaledObjectConfig withNamespace(String namespace) {
this.namespace = namespace;
return this;
}
public ScaledObjectConfig withTarget(String targetName) {
this.targetName = targetName;
return this;
}
public ScaledObjectConfig withTarget(String targetKind, String targetName) {
this.targetKind = targetKind;
this.targetName = targetName;
return this;
}
public ScaledObjectConfig withReplicaBounds(int min, int max) {
this.minReplicas = min;
this.maxReplicas = max;
return this;
}
public ScaledObjectConfig withPollingInterval(int seconds) {
this.pollingInterval = seconds;
return this;
}
public ScaledObjectConfig withCooldownPeriod(int seconds) {
this.cooldownPeriod = seconds;
return this;
}
public ScaledObjectConfig withTrigger(TriggerConfig trigger) {
this.triggers.add(trigger);
return this;
}
public ScaledObjectConfig withAdvanced(Map<String, Object> advanced) {
this.advanced = advanced;
return this;
}
// Getters
public String getName() { return name; }
public String getNamespace() { return namespace; }
public Map<String, String> getLabels() { return labels; }
public String getTargetApiVersion() { return targetApiVersion; }
public String getTargetKind() { return targetKind; }
public String getTargetName() { return targetName; }
public Integer getPollingInterval() { return pollingInterval; }
public Integer getCooldownPeriod() { return cooldownPeriod; }
public Integer getMinReplicas() { return minReplicas; }
public Integer getMaxReplicas() { return maxReplicas; }
public Map<String, Object> getAdvanced() { return advanced; }
public List<TriggerConfig> getTriggers() { return triggers; }
}
public static class TriggerConfig {
private String type;
private Map<String, String> metadata;
private String authenticationRef;
public static TriggerConfig kafka(String bootstrapServers, String topic,
String consumerGroup, int lagThreshold) {
Map<String, String> metadata = new HashMap<>();
metadata.put("bootstrapServers", bootstrapServers);
metadata.put("topic", topic);
metadata.put("consumerGroup", consumerGroup);
metadata.put("lagThreshold", String.valueOf(lagThreshold));
metadata.put("offsetResetPolicy", "latest");
return new TriggerConfig()
.withType("kafka")
.withMetadata(metadata);
}
public static TriggerConfig rabbitMQ(String host, String queueName,
String vhost, int queueLength) {
Map<String, String> metadata = new HashMap<>();
metadata.put("host", host);
metadata.put("queueName", queueName);
metadata.put("vhost", vhost);
metadata.put("queueLength", String.valueOf(queueLength));
metadata.put("protocol", "amqp");
return new TriggerConfig()
.withType("rabbitmq")
.withMetadata(metadata);
}
public static TriggerConfig redis(String address, String listName,
int listLength, String listType) {
Map<String, String> metadata = new HashMap<>();
metadata.put("address", address);
metadata.put("listName", listName);
metadata.put("listLength", String.valueOf(listLength));
metadata.put("listType", listType); // "list" or "set"
return new TriggerConfig()
.withType("redis")
.withMetadata(metadata);
}
public static TriggerConfig http(String url, String valueLocation,
int threshold) {
Map<String, String> metadata = new HashMap<>();
metadata.put("url", url);
metadata.put("valueLocation", valueLocation); // e.g., "message.count"
metadata.put("threshold", String.valueOf(threshold));
return new TriggerConfig()
.withType("http")
.withMetadata(metadata);
}
public static TriggerConfig cron(String desiredReplicas, String timezone) {
Map<String, String> metadata = new HashMap<>();
metadata.put("desiredReplicas", desiredReplicas);
metadata.put("timezone", timezone);
metadata.put("start", "0 9 * * 1-5"); // 9 AM on weekdays
metadata.put("end", "0 17 * * 1-5"); // 5 PM on weekdays
return new TriggerConfig()
.withType("cron")
.withMetadata(metadata);
}
// Builder methods
public TriggerConfig withType(String type) {
this.type = type;
return this;
}
public TriggerConfig withMetadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}
public TriggerConfig withAuthenticationRef(String authRef) {
this.authenticationRef = authRef;
return this;
}
// Getters
public String getType() { return type; }
public Map<String, String> getMetadata() { return metadata; }
public String getAuthenticationRef() { return authenticationRef; }
}
}
2. KEDA Deployment Service
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class KedaDeploymentService {
private static final Logger logger = LoggerFactory.getLogger(KedaDeploymentService.class);
private final CustomObjectsApi customObjectsApi;
private final KedaConfigGenerator configGenerator;
public KedaDeploymentService(ApiClient apiClient, KedaConfigGenerator configGenerator) {
this.customObjectsApi = new CustomObjectsApi(apiClient);
this.configGenerator = configGenerator;
}
public void deployScaledObject(KedaConfigGenerator.ScaledObjectConfig config)
throws ApiException {
String scaledObjectYaml = configGenerator.generateScaledObject(config);
logger.info("Deploying ScaledObject: {}", config.getName());
Map<String, Object> scaledObject = configGenerator.generateScaledObjectMap(config);
try {
// Create ScaledObject
Object result = customObjectsApi.createNamespacedCustomObject(
"keda.sh", // group
"v1alpha1", // version
config.getNamespace(), // namespace
"scaledobjects", // plural
scaledObject, // body
null, // pretty
null, // dryRun
null, // fieldManager
null // fieldValidation
);
logger.info("Successfully deployed ScaledObject: {}", config.getName());
} catch (ApiException e) {
logger.error("Failed to deploy ScaledObject: {}", e.getResponseBody(), e);
throw e;
}
}
public void updateScaledObject(KedaConfigGenerator.ScaledObjectConfig config)
throws ApiException {
Map<String, Object> scaledObject = configGenerator.generateScaledObjectMap(config);
try {
Object result = customObjectsApi.replaceNamespacedCustomObject(
"keda.sh",
"v1alpha1",
config.getNamespace(),
"scaledobjects",
config.getName(),
scaledObject,
null, null, null, null
);
logger.info("Successfully updated ScaledObject: {}", config.getName());
} catch (ApiException e) {
logger.error("Failed to update ScaledObject: {}", e.getResponseBody(), e);
throw e;
}
}
public void deleteScaledObject(String name, String namespace) throws ApiException {
try {
Object result = customObjectsApi.deleteNamespacedCustomObject(
"keda.sh",
"v1alpha1",
namespace,
"scaledobjects",
name,
null, // gracePeriodSeconds
null, // orphanDependents
null, // propagationPolicy
new V1DeleteOptions(), // body
null, null, null
);
logger.info("Successfully deleted ScaledObject: {}", name);
} catch (ApiException e) {
logger.error("Failed to delete ScaledObject: {}", e.getResponseBody(), e);
throw e;
}
}
public Object getScaledObject(String name, String namespace) throws ApiException {
return customObjectsApi.getNamespacedCustomObject(
"keda.sh",
"v1alpha1",
namespace,
"scaledobjects",
name
);
}
public Object listScaledObjects(String namespace) throws ApiException {
return customObjectsApi.listNamespacedCustomObject(
"keda.sh",
"v1alpha1",
namespace,
"scaledobjects",
null, // pretty
null, // allowWatchBookmarks
null, // continue
null, // fieldSelector
null, // labelSelector
null, // limit
null, // resourceVersion
null, // resourceVersionMatch
null, // timeoutSeconds
null // watch
);
}
}
3. Event-Driven Application with KEDA Scaling
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class EventDrivenProcessor {
private static final Logger logger = LoggerFactory.getLogger(EventDrivenProcessor.class);
private final MetricsService metricsService;
public EventDrivenProcessor(MetricsService metricsService) {
this.metricsService = metricsService;
}
@KafkaListener(topics = "${kafka.topic.orders}")
public void processOrderEvent(String orderEvent) {
logger.info("Processing order event: {}", orderEvent);
metricsService.incrementProcessedOrders();
try {
// Simulate processing time
Thread.sleep(100);
// Process the order
processOrder(Order.fromJson(orderEvent));
metricsService.incrementSuccessfulOrders();
} catch (Exception e) {
logger.error("Failed to process order event", e);
metricsService.incrementFailedOrders();
}
}
@KafkaListener(topics = "${kafka.topic.payments}")
public void processPaymentEvent(String paymentEvent) {
logger.info("Processing payment event: {}", paymentEvent);
metricsService.incrementProcessedPayments();
try {
// Process payment
processPayment(Payment.fromJson(paymentEvent));
metricsService.incrementSuccessfulPayments();
} catch (Exception e) {
logger.error("Failed to process payment event", e);
metricsService.incrementFailedPayments();
}
}
private void processOrder(Order order) {
// Order processing logic
logger.info("Processing order: {}", order.getId());
// Implement order processing
}
private void processPayment(Payment payment) {
// Payment processing logic
logger.info("Processing payment: {}", payment.getId());
// Implement payment processing
}
// Data classes
public static class Order {
private String id;
private String customerId;
private double amount;
public static Order fromJson(String json) {
// Simple JSON parsing - use Jackson in real implementation
return new Order();
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
}
public static class Payment {
private String id;
private String orderId;
private double amount;
private String status;
public static Payment fromJson(String json) {
// Simple JSON parsing - use Jackson in real implementation
return new Payment();
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
}
}
4. Metrics Service for Scaling Decisions
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class MetricsService {
private final MeterRegistry meterRegistry;
// Counters
private final Counter processedOrders;
private final Counter successfulOrders;
private final Counter failedOrders;
private final Counter processedPayments;
private final Counter successfulPayments;
private final Counter failedPayments;
// Gauges for current load
private final AtomicInteger currentQueueSize = new AtomicInteger(0);
private final AtomicLong processingTime = new AtomicLong(0);
private final AtomicInteger activeConnections = new AtomicInteger(0);
public MetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// Initialize counters
this.processedOrders = Counter.builder("orders.processed.total")
.description("Total number of processed orders")
.register(meterRegistry);
this.successfulOrders = Counter.builder("orders.successful.total")
.description("Total number of successfully processed orders")
.register(meterRegistry);
this.failedOrders = Counter.builder("orders.failed.total")
.description("Total number of failed orders")
.register(meterRegistry);
this.processedPayments = Counter.builder("payments.processed.total")
.description("Total number of processed payments")
.register(meterRegistry);
this.successfulPayments = Counter.builder("payments.successful.total")
.description("Total number of successfully processed payments")
.register(meterRegistry);
this.failedPayments = Counter.builder("payments.failed.total")
.description("Total number of failed payments")
.register(meterRegistry);
// Initialize gauges
Gauge.builder("queue.current.size")
.description("Current queue size")
.register(meterRegistry, currentQueueSize, AtomicInteger::get);
Gauge.builder("processing.time.avg")
.description("Average processing time in milliseconds")
.register(meterRegistry, processingTime, AtomicLong::get);
Gauge.builder("connections.active")
.description("Number of active connections")
.register(meterRegistry, activeConnections, AtomicInteger::get);
}
// Order metrics
public void incrementProcessedOrders() {
processedOrders.increment();
}
public void incrementSuccessfulOrders() {
successfulOrders.increment();
}
public void incrementFailedOrders() {
failedOrders.increment();
}
// Payment metrics
public void incrementProcessedPayments() {
processedPayments.increment();
}
public void incrementSuccessfulPayments() {
successfulPayments.increment();
}
public void incrementFailedPayments() {
failedPayments.increment();
}
// Queue metrics
public void setCurrentQueueSize(int size) {
currentQueueSize.set(size);
}
public void incrementQueueSize() {
currentQueueSize.incrementAndGet();
}
public void decrementQueueSize() {
currentQueueSize.decrementAndGet();
}
// Processing time metrics
public void recordProcessingTime(long milliseconds) {
processingTime.set(milliseconds);
}
// Connection metrics
public void incrementActiveConnections() {
activeConnections.incrementAndGet();
}
public void decrementActiveConnections() {
activeConnections.decrementAndGet();
}
// Get metrics for scaling decisions
public int getCurrentQueueSize() {
return currentQueueSize.get();
}
public long getAverageProcessingTime() {
return processingTime.get();
}
public int getActiveConnections() {
return activeConnections.get();
}
}
5. KEDA Scaling Manager
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class KedaScalingManager {
private static final Logger logger = LoggerFactory.getLogger(KedaScalingManager.class);
private final KedaDeploymentService kedaDeploymentService;
private final MetricsService metricsService;
private final Map<String, KedaConfigGenerator.ScaledObjectConfig> scaledObjects;
public KedaScalingManager(KedaDeploymentService kedaDeploymentService,
MetricsService metricsService) {
this.kedaDeploymentService = kedaDeploymentService;
this.metricsService = metricsService;
this.scaledObjects = new ConcurrentHashMap<>();
}
public void registerScaledObject(String name, KedaConfigGenerator.ScaledObjectConfig config) {
scaledObjects.put(name, config);
logger.info("Registered ScaledObject: {}", name);
}
public void deployScaledObject(String name) {
KedaConfigGenerator.ScaledObjectConfig config = scaledObjects.get(name);
if (config != null) {
try {
kedaDeploymentService.deployScaledObject(config);
logger.info("Deployed ScaledObject: {}", name);
} catch (Exception e) {
logger.error("Failed to deploy ScaledObject: {}", name, e);
}
}
}
public void updateScaledObjectThreshold(String scaledObjectName, String triggerType,
String thresholdKey, int newThreshold) {
KedaConfigGenerator.ScaledObjectConfig config = scaledObjects.get(scaledObjectName);
if (config != null) {
config.getTriggers().stream()
.filter(trigger -> triggerType.equals(trigger.getType()))
.findFirst()
.ifPresent(trigger -> {
trigger.getMetadata().put(thresholdKey, String.valueOf(newThreshold));
try {
kedaDeploymentService.updateScaledObject(config);
logger.info("Updated threshold for {}: {} = {}",
scaledObjectName, thresholdKey, newThreshold);
} catch (Exception e) {
logger.error("Failed to update ScaledObject threshold", e);
}
});
}
}
// Dynamic scaling based on metrics
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void adjustScalingDynamically() {
int currentQueueSize = metricsService.getCurrentQueueSize();
long avgProcessingTime = metricsService.getAverageProcessingTime();
// Adjust Kafka lag threshold based on processing performance
if (avgProcessingTime > 500) { // If processing is slow
int newThreshold = Math.max(10, currentQueueSize / 10);
updateScaledObjectThreshold("order-processor", "kafka", "lagThreshold", newThreshold);
logger.info("Adjusted Kafka lag threshold to {} due to slow processing", newThreshold);
} else if (avgProcessingTime < 100) { // If processing is fast
int newThreshold = Math.min(1000, currentQueueSize * 2);
updateScaledObjectThreshold("order-processor", "kafka", "lagThreshold", newThreshold);
logger.info("Adjusted Kafka lag threshold to {} due to fast processing", newThreshold);
}
}
// Scale to zero during off-hours
@Scheduled(cron = "0 0 22 * * ?") // 10 PM daily
public void scaleToZeroForNight() {
try {
updateScaledObjectThreshold("order-processor", "kafka", "lagThreshold", 0);
logger.info("Scaled to zero for night hours");
} catch (Exception e) {
logger.error("Failed to scale to zero", e);
}
}
// Scale up for morning peak
@Scheduled(cron = "0 0 6 * * ?") // 6 AM daily
public void scaleUpForMorning() {
try {
updateScaledObjectThreshold("order-processor", "kafka", "lagThreshold", 100);
logger.info("Scaled up for morning peak hours");
} catch (Exception e) {
logger.error("Failed to scale up for morning", e);
}
}
}
6. HTTP Scaler for Custom Metrics
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import org.springframework.boot.actuate.endpoint.web.annotation.RestControllerEndpoint;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@RestController
@RequestMapping("/api/scaling")
public class ScalingMetricsController {
private final MetricsService metricsService;
private final AtomicInteger customWorkload = new AtomicInteger(0);
public ScalingMetricsController(MetricsService metricsService) {
this.metricsService = metricsService;
}
@GetMapping("/metrics")
public ResponseEntity<Map<String, Object>> getScalingMetrics() {
Map<String, Object> metrics = Map.of(
"queueSize", metricsService.getCurrentQueueSize(),
"processingTime", metricsService.getAverageProcessingTime(),
"activeConnections", metricsService.getActiveConnections(),
"customWorkload", customWorkload.get(),
"timestamp", System.currentTimeMillis()
);
return ResponseEntity.ok(metrics);
}
@PostMapping("/workload")
public ResponseEntity<String> updateWorkload(@RequestBody WorkloadRequest request) {
customWorkload.set(request.getWorkload());
metricsService.setCurrentQueueSize(request.getWorkload());
return ResponseEntity.ok("Workload updated to: " + request.getWorkload());
}
@GetMapping("/health")
public ResponseEntity<Map<String, String>> health() {
return ResponseEntity.ok(Map.of("status", "healthy", "service", "scaling-metrics"));
}
public static class WorkloadRequest {
private int workload;
public int getWorkload() { return workload; }
public void setWorkload(int workload) { this.workload = workload; }
}
}
7. KEDA Configuration Factory
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KedaConfigFactory {
@Value("${kafka.bootstrap.servers:localhost:9092}")
private String kafkaBootstrapServers;
@Value("${kafka.consumer.group:order-processor-group}")
private String kafkaConsumerGroup;
@Value("${rabbitmq.host:localhost}")
private String rabbitMQHost;
@Value("${redis.address:localhost:6379}")
private String redisAddress;
public KedaConfigGenerator.ScaledObjectConfig createOrderProcessorConfig() {
return new KedaConfigGenerator.ScaledObjectConfig()
.withName("order-processor-scaledobject")
.withNamespace("default")
.withTarget("Deployment", "order-processor")
.withReplicaBounds(0, 10)
.withPollingInterval(15)
.withCooldownPeriod(60)
.withTrigger(KedaConfigGenerator.TriggerConfig.kafka(
kafkaBootstrapServers,
"orders",
kafkaConsumerGroup,
50 // Scale when lag reaches 50 messages
))
.withAdvanced(createAdvancedConfig());
}
public KedaConfigGenerator.ScaledObjectConfig createPaymentProcessorConfig() {
return new KedaConfigGenerator.ScaledObjectConfig()
.withName("payment-processor-scaledobject")
.withNamespace("default")
.withTarget("Deployment", "payment-processor")
.withReplicaBounds(1, 5) // Always keep at least 1 replica
.withPollingInterval(30)
.withTrigger(KedaConfigGenerator.TriggerConfig.rabbitMQ(
rabbitMQHost,
"payment-queue",
"/",
20 // Scale when queue has 20 messages
));
}
public KedaConfigGenerator.ScaledObjectConfig createNotificationServiceConfig() {
return new KedaConfigGenerator.ScaledObjectConfig()
.withName("notification-service-scaledobject")
.withNamespace("default")
.withTarget("Deployment", "notification-service")
.withReplicaBounds(0, 3)
.withTrigger(KedaConfigGenerator.TriggerConfig.redis(
redisAddress,
"notification-queue",
100, // Scale when list has 100 items
"list"
));
}
public KedaConfigGenerator.ScaledObjectConfig createCronScaledConfig() {
return new KedaConfigGenerator.ScaledObjectConfig()
.withName("batch-job-scaledobject")
.withNamespace("default")
.withTarget("Job", "batch-processing-job")
.withReplicaBounds(0, 1)
.withTrigger(KedaConfigGenerator.TriggerConfig.cron("1", "UTC"));
}
public KedaConfigGenerator.ScaledObjectConfig createHttpScaledConfig(String url) {
return new KedaConfigGenerator.ScaledObjectConfig()
.withName("http-scaledobject")
.withNamespace("default")
.withTarget("Deployment", "http-service")
.withReplicaBounds(0, 5)
.withTrigger(KedaConfigGenerator.TriggerConfig.http(
url,
"queueSize",
10 // Scale when queueSize > 10
));
}
private Map<String, Object> createAdvancedConfig() {
Map<String, Object> advanced = new HashMap<>();
Map<String, Object> horizontalPodAutoscalerConfig = new HashMap<>();
Map<String, Object> behavior = new HashMap<>();
Map<String, Object> scaleDown = new HashMap<>();
scaleDown.put("stabilizationWindowSeconds", 300);
scaleDown.put("policies", Arrays.asList(
Map.of("type", "Pods", "value", 1, "periodSeconds", 60)
));
behavior.put("scaleDown", scaleDown);
horizontalPodAutoscalerConfig.put("behavior", behavior);
advanced.put("horizontalPodAutoscalerConfig", horizontalPodAutoscalerConfig);
return advanced;
}
}
8. Spring Boot Application Configuration
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class KedaJavaApplication {
public static void main(String[] args) {
SpringApplication.run(KedaJavaApplication.class, args);
}
@Bean
public CommandLineRunner setupKeda(KedaScalingManager scalingManager,
KedaConfigFactory configFactory) {
return args -> {
// Register scaled objects
scalingManager.registerScaledObject("order-processor",
configFactory.createOrderProcessorConfig());
scalingManager.registerScaledObject("payment-processor",
configFactory.createPaymentProcessorConfig());
scalingManager.registerScaledObject("notification-service",
configFactory.createNotificationServiceConfig());
// Deploy scaled objects
scalingManager.deployScaledObject("order-processor");
scalingManager.deployScaledObject("payment-processor");
scalingManager.deployScaledObject("notification-service");
};
}
@Bean
public KedaConfigGenerator kedaConfigGenerator() {
return new KedaConfigGenerator();
}
}
9. Application Properties
# application.properties # Kafka Configuration kafka.bootstrap.servers=localhost:9092 kafka.topic.orders=orders kafka.topic.payments=payments kafka.consumer.group=order-processor-group # RabbitMQ Configuration rabbitmq.host=localhost rabbitmq.queue.payments=payment-queue # Redis Configuration redis.address=localhost:6379 redis.queue.notifications=notification-queue # KEDA Configuration keda.polling.interval=30 keda.cooldown.period=300 # Application Configuration server.port=8080 management.endpoints.web.exposure.include=health,metrics,info # Logging logging.level.com.example.keda=DEBUG
10. Kubernetes Deployment with KEDA
# deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: order-processor namespace: default spec: replicas: 0 # Start with 0, let KEDA scale selector: matchLabels: app: order-processor template: metadata: labels: app: order-processor spec: containers: - name: order-processor image: my-registry/order-processor:latest ports: - containerPort: 8080 env: - name: KAFKA_BOOTSTRAP_SERVERS value: "kafka-broker:9092" - name: SPRING_PROFILES_ACTIVE value: "kubernetes" resources: requests: memory: "128Mi" cpu: "100m" limits: memory: "512Mi" cpu: "500m" --- # Service for metrics apiVersion: v1 kind: Service metadata: name: order-processor-service namespace: default spec: selector: app: order-processor ports: - port: 8080 targetPort: 8080
11. Usage Example
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
private final MetricsService metricsService;
private final EventDrivenProcessor eventProcessor;
public OrderController(MetricsService metricsService, EventDrivenProcessor eventProcessor) {
this.metricsService = metricsService;
this.eventProcessor = eventProcessor;
}
@PostMapping("/api/orders")
public String createOrder(@RequestBody OrderRequest request) {
metricsService.incrementQueueSize();
try {
// Process order (in real scenario, this would send to Kafka)
eventProcessor.processOrderEvent(request.toJson());
return "Order accepted";
} finally {
metricsService.decrementQueueSize();
}
}
@PostMapping("/api/orders/batch")
public String createOrdersBatch(@RequestBody List<OrderRequest> requests) {
metricsService.setCurrentQueueSize(requests.size());
// In real scenario, these would be sent to Kafka
// KEDA would automatically scale based on queue size
return "Batch processing started";
}
public static class OrderRequest {
private String customerId;
private List<OrderItem> items;
public String toJson() {
// Convert to JSON
return "{}";
}
// Getters and setters
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public List<OrderItem> getItems() { return items; }
public void setItems(List<OrderItem> items) { this.items = items; }
}
public static class OrderItem {
private String productId;
private int quantity;
private double price;
// Getters and setters
public String getProductId() { return productId; }
public void setProductId(String productId) { this.productId = productId; }
public int getQuantity() { return quantity; }
public void setQuantity(int quantity) { this.quantity = quantity; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
}
}
This comprehensive KEDA