KEDA (Kubernetes Event-Driven Autoscaling) is a Kubernetes-based event-driven autoscaler that allows applications to scale based on events from various sources. This guide provides a complete Java implementation for KEDA-integrated applications.
Project Setup and Dependencies
1. Maven Dependencies
<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>3.1.0</version> </dependency> <!-- Message Queues --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <!-- Kubernetes Client --> <dependency> <groupId>io.kubernetes</groupId> <artifactId>client-java</artifactId> <version>18.0.0</version> </dependency> <!-- Metrics --> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-core</artifactId> <version>1.11.5</version> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> <version>1.11.5</version> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.15.2</version> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>3.1.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.18.3</version> <scope>test</scope> </dependency> </dependencies>
Core KEDA Configuration and Models
1. KEDA Configuration
// KedaConfig.java
package com.company.keda.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConfigurationProperties(prefix = "keda")
public class KedaConfig {
private boolean enabled = true;
private int minReplicas = 1;
private int maxReplicas = 10;
private int cooldownPeriod = 300;
private int pollingInterval = 30;
private Map<String, ScalerConfig> scalers = new HashMap<>();
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public int getMinReplicas() { return minReplicas; }
public void setMinReplicas(int minReplicas) { this.minReplicas = minReplicas; }
public int getMaxReplicas() { return maxReplicas; }
public void setMaxReplicas(int maxReplicas) { this.maxReplicas = maxReplicas; }
public int getCooldownPeriod() { return cooldownPeriod; }
public void setCooldownPeriod(int cooldownPeriod) { this.cooldownPeriod = cooldownPeriod; }
public int getPollingInterval() { return pollingInterval; }
public void setPollingInterval(int pollingInterval) { this.pollingInterval = pollingInterval; }
public Map<String, ScalerConfig> getScalers() { return scalers; }
public void setScalers(Map<String, ScalerConfig> scalers) { this.scalers = scalers; }
public static class ScalerConfig {
private boolean enabled = true;
private int threshold = 10;
private String connectionString;
private String queueName;
private String topic;
private String consumerGroup;
private Map<String, String> metadata = new HashMap<>();
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public int getThreshold() { return threshold; }
public void setThreshold(int threshold) { this.threshold = threshold; }
public String getConnectionString() { return connectionString; }
public void setConnectionString(String connectionString) { this.connectionString = connectionString; }
public String getQueueName() { return queueName; }
public void setQueueName(String queueName) { this.queueName = queueName; }
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getConsumerGroup() { return consumerGroup; }
public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; }
public Map<String, String> getMetadata() { return metadata; }
public void setMetadata(Map<String, String> metadata) { this.metadata = metadata; }
}
}
// ScalerType.java
package com.company.keda.model;
public enum ScalerType {
RABBITMQ("rabbitmq"),
KAFKA("kafka"),
REDIS("redis"),
AWS_SQS("aws-sqs"),
AZURE_QUEUE("azure-queue"),
AZURE_SERVICE_BUS("azure-service-bus"),
PROMETHEUS("prometheus"),
CPU("cpu"),
MEMORY("memory"),
HTTP("http"),
POSTGRES("postgres"),
MYSQL("mysql"),
EXTERNAL("external");
private final String value;
ScalerType(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public static ScalerType fromString(String value) {
for (ScalerType type : ScalerType.values()) {
if (type.value.equalsIgnoreCase(value)) {
return type;
}
}
throw new IllegalArgumentException("Unknown scaler type: " + value);
}
}
2. KEDA Resource Models
// ScaledObject.java
package com.company.keda.model;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ScaledObject {
@JsonProperty("apiVersion")
private String apiVersion = "keda.sh/v1alpha1";
@JsonProperty("kind")
private String kind = "ScaledObject";
@JsonProperty("metadata")
private V1ObjectMeta metadata;
@JsonProperty("spec")
private Spec spec;
// Getters and setters
public String getApiVersion() { return apiVersion; }
public void setApiVersion(String apiVersion) { this.apiVersion = apiVersion; }
public String getKind() { return kind; }
public void setKind(String kind) { this.kind = kind; }
public V1ObjectMeta getMetadata() { return metadata; }
public void setMetadata(V1ObjectMeta metadata) { this.metadata = metadata; }
public Spec getSpec() { return spec; }
public void setSpec(Spec spec) { this.spec = spec; }
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Spec {
@JsonProperty("scaleTargetRef")
private ScaleTargetRef scaleTargetRef;
@JsonProperty("pollingInterval")
private Integer pollingInterval;
@JsonProperty("cooldownPeriod")
private Integer cooldownPeriod;
@JsonProperty("minReplicaCount")
private Integer minReplicaCount;
@JsonProperty("maxReplicaCount")
private Integer maxReplicaCount;
@JsonProperty("advanced")
private Advanced advanced;
@JsonProperty("triggers")
private List<Trigger> triggers;
// Getters and setters
public ScaleTargetRef getScaleTargetRef() { return scaleTargetRef; }
public void setScaleTargetRef(ScaleTargetRef scaleTargetRef) { this.scaleTargetRef = scaleTargetRef; }
public Integer getPollingInterval() { return pollingInterval; }
public void setPollingInterval(Integer pollingInterval) { this.pollingInterval = pollingInterval; }
public Integer getCooldownPeriod() { return cooldownPeriod; }
public void setCooldownPeriod(Integer cooldownPeriod) { this.cooldownPeriod = cooldownPeriod; }
public Integer getMinReplicaCount() { return minReplicaCount; }
public void setMinReplicaCount(Integer minReplicaCount) { this.minReplicaCount = minReplicaCount; }
public Integer getMaxReplicaCount() { return maxReplicaCount; }
public void setMaxReplicaCount(Integer maxReplicaCount) { this.maxReplicaCount = maxReplicaCount; }
public Advanced getAdvanced() { return advanced; }
public void setAdvanced(Advanced advanced) { this.advanced = advanced; }
public List<Trigger> getTriggers() { return triggers; }
public void setTriggers(List<Trigger> triggers) { this.triggers = triggers; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ScaleTargetRef {
@JsonProperty("apiVersion")
private String apiVersion = "apps/v1";
@JsonProperty("kind")
private String kind = "Deployment";
@JsonProperty("name")
private String name;
// Getters and setters
public String getApiVersion() { return apiVersion; }
public void setApiVersion(String apiVersion) { this.apiVersion = apiVersion; }
public String getKind() { return kind; }
public void setKind(String kind) { this.kind = kind; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Advanced {
@JsonProperty("restoreToOriginalReplicaCount")
private Boolean restoreToOriginalReplicaCount;
@JsonProperty("horizontalPodAutoscalerConfig")
private HorizontalPodAutoscalerConfig horizontalPodAutoscalerConfig;
// Getters and setters
public Boolean getRestoreToOriginalReplicaCount() { return restoreToOriginalReplicaCount; }
public void setRestoreToOriginalReplicaCount(Boolean restoreToOriginalReplicaCount) { this.restoreToOriginalReplicaCount = restoreToOriginalReplicaCount; }
public HorizontalPodAutoscalerConfig getHorizontalPodAutoscalerConfig() { return horizontalPodAutoscalerConfig; }
public void setHorizontalPodAutoscalerConfig(HorizontalPodAutoscalerConfig horizontalPodAutoscalerConfig) { this.horizontalPodAutoscalerConfig = horizontalPodAutoscalerConfig; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class HorizontalPodAutoscalerConfig {
@JsonProperty("behavior")
private Behavior behavior;
// Getters and setters
public Behavior getBehavior() { return behavior; }
public void setBehavior(Behavior behavior) { this.behavior = behavior; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Behavior {
@JsonProperty("scaleDown")
private ScaleBehavior scaleDown;
@JsonProperty("scaleUp")
private ScaleBehavior scaleUp;
// Getters and setters
public ScaleBehavior getScaleDown() { return scaleDown; }
public void setScaleDown(ScaleBehavior scaleDown) { this.scaleDown = scaleDown; }
public ScaleBehavior getScaleUp() { return scaleUp; }
public void setScaleUp(ScaleBehavior scaleUp) { this.scaleUp = scaleUp; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ScaleBehavior {
@JsonProperty("stabilizationWindowSeconds")
private Integer stabilizationWindowSeconds;
@JsonProperty("policies")
private List<ScalingPolicy> policies;
// Getters and setters
public Integer getStabilizationWindowSeconds() { return stabilizationWindowSeconds; }
public void setStabilizationWindowSeconds(Integer stabilizationWindowSeconds) { this.stabilizationWindowSeconds = stabilizationWindowSeconds; }
public List<ScalingPolicy> getPolicies() { return policies; }
public void setPolicies(List<ScalingPolicy> policies) { this.policies = policies; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ScalingPolicy {
@JsonProperty("type")
private String type;
@JsonProperty("value")
private Integer value;
@JsonProperty("periodSeconds")
private Integer periodSeconds;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Integer getValue() { return value; }
public void setValue(Integer value) { this.value = value; }
public Integer getPeriodSeconds() { return periodSeconds; }
public void setPeriodSeconds(Integer periodSeconds) { this.periodSeconds = periodSeconds; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Trigger {
@JsonProperty("type")
private String type;
@JsonProperty("metadata")
private Map<String, String> metadata;
@JsonProperty("authenticationRef")
private ScaledObjectRef authenticationRef;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Map<String, String> getMetadata() { return metadata; }
public void setMetadata(Map<String, String> metadata) { this.metadata = metadata; }
public ScaledObjectRef getAuthenticationRef() { return authenticationRef; }
public void setAuthenticationRef(ScaledObjectRef authenticationRef) { this.authenticationRef = authenticationRef; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ScaledObjectRef {
@JsonProperty("name")
private String name;
// Getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
}
}
// TriggerAuthentication.java
package com.company.keda.model;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class TriggerAuthentication {
@JsonProperty("apiVersion")
private String apiVersion = "keda.sh/v1alpha1";
@JsonProperty("kind")
private String kind = "TriggerAuthentication";
@JsonProperty("metadata")
private V1ObjectMeta metadata;
@JsonProperty("spec")
private Spec spec;
// Getters and setters
public String getApiVersion() { return apiVersion; }
public void setApiVersion(String apiVersion) { this.apiVersion = apiVersion; }
public String getKind() { return kind; }
public void setKind(String kind) { this.kind = kind; }
public V1ObjectMeta getMetadata() { return metadata; }
public void setMetadata(V1ObjectMeta metadata) { this.metadata = metadata; }
public Spec getSpec() { return spec; }
public void setSpec(Spec spec) { this.spec = spec; }
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Spec {
@JsonProperty("podIdentity")
private PodIdentity podIdentity;
@JsonProperty("secretTargetRef")
private List<SecretTargetRef> secretTargetRef;
@JsonProperty("env")
private List<Env> env;
@JsonProperty("hashed")
private List<Hashed> hashed;
// Getters and setters
public PodIdentity getPodIdentity() { return podIdentity; }
public void setPodIdentity(PodIdentity podIdentity) { this.podIdentity = podIdentity; }
public List<SecretTargetRef> getSecretTargetRef() { return secretTargetRef; }
public void setSecretTargetRef(List<SecretTargetRef> secretTargetRef) { this.secretTargetRef = secretTargetRef; }
public List<Env> getEnv() { return env; }
public void setEnv(List<Env> env) { this.env = env; }
public List<Hashed> getHashed() { return hashed; }
public void setHashed(List<Hashed> hashed) { this.hashed = hashed; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class PodIdentity {
@JsonProperty("provider")
private String provider;
// Getters and setters
public String getProvider() { return provider; }
public void setProvider(String provider) { this.provider = provider; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SecretTargetRef {
@JsonProperty("parameter")
private String parameter;
@JsonProperty("name")
private String name;
@JsonProperty("key")
private String key;
// Getters and setters
public String getParameter() { return parameter; }
public void setParameter(String parameter) { this.parameter = parameter; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Env {
@JsonProperty("parameter")
private String parameter;
@JsonProperty("name")
private String name;
@JsonProperty("containerName")
private String containerName;
// Getters and setters
public String getParameter() { return parameter; }
public void setParameter(String parameter) { this.parameter = parameter; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getContainerName() { return containerName; }
public void setContainerName(String containerName) { this.containerName = containerName; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class Hashed {
@JsonProperty("parameter")
private String parameter;
@JsonProperty("name")
private String name;
@JsonProperty("key")
private String key;
// Getters and setters
public String getParameter() { return parameter; }
public void setParameter(String parameter) { this.parameter = parameter; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
}
}
Event Scaler Implementations
1. Base Scaler Interface
// EventScaler.java
package com.company.keda.scaler;
import com.company.keda.model.ScalerType;
public interface EventScaler {
/**
* Get the type of scaler
*/
ScalerType getType();
/**
* Check if scaler is available/healthy
*/
boolean isAvailable();
/**
* Get the current metric value for scaling
*/
int getMetricValue();
/**
* Get the threshold for scaling
*/
int getThreshold();
/**
* Check if scaling should be triggered
*/
default boolean shouldScale() {
return getMetricValue() >= getThreshold();
}
/**
* Get scaling recommendation based on metric value and threshold
*/
default int getRecommendedReplicas(int currentReplicas, int maxReplicas) {
int metricValue = getMetricValue();
int threshold = getThreshold();
if (metricValue == 0) {
return 1; // Minimum replicas
}
// Calculate replicas based on metric value and threshold
int recommended = (int) Math.ceil((double) metricValue / threshold);
// Ensure within bounds
return Math.max(1, Math.min(recommended, maxReplicas));
}
}
// ScalerFactory.java
package com.company.keda.scaler;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.ScalerType;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class ScalerFactory {
private final Map<ScalerType, EventScaler> scalers = new HashMap<>();
private final KedaConfig kedaConfig;
public ScalerFactory(KedaConfig kedaConfig,
RabbitMQScaler rabbitMQScaler,
KafkaScaler kafkaScaler,
RedisScaler redisScaler,
HttpScaler httpScaler,
PrometheusScaler prometheusScaler) {
this.kedaConfig = kedaConfig;
// Register scalers
scalers.put(ScalerType.RABBITMQ, rabbitMQScaler);
scalers.put(ScalerType.KAFKA, kafkaScaler);
scalers.put(ScalerType.REDIS, redisScaler);
scalers.put(ScalerType.HTTP, httpScaler);
scalers.put(ScalerType.PROMETHEUS, prometheusScaler);
}
public EventScaler getScaler(ScalerType type) {
EventScaler scaler = scalers.get(type);
if (scaler == null) {
throw new IllegalArgumentException("Unsupported scaler type: " + type);
}
return scaler;
}
public boolean isScalerEnabled(ScalerType type) {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(type.getValue());
return config != null && config.isEnabled();
}
public KedaConfig.ScalerConfig getScalerConfig(ScalerType type) {
return kedaConfig.getScalers().get(type.getValue());
}
}
2. RabbitMQ Scaler
// RabbitMQScaler.java
package com.company.keda.scaler;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.ScalerType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class RabbitMQScaler implements EventScaler {
private static final Logger log = LoggerFactory.getLogger(RabbitMQScaler.class);
private final KedaConfig kedaConfig;
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
public RabbitMQScaler(KedaConfig kedaConfig) {
this.kedaConfig = kedaConfig;
initializeConnection();
}
@Override
public ScalerType getType() {
return ScalerType.RABBITMQ;
}
@Override
public boolean isAvailable() {
try {
if (connection == null || !connection.isOpen()) {
initializeConnection();
}
return connection != null && connection.isOpen();
} catch (Exception e) {
log.warn("RabbitMQ connection check failed: {}", e.getMessage());
return false;
}
}
@Override
public int getMetricValue() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || !config.isEnabled()) {
return 0;
}
try {
String queueName = config.getQueueName();
if (queueName == null) {
log.error("RabbitMQ queue name not configured");
return 0;
}
// Get message count from RabbitMQ
return getQueueMessageCount(queueName);
} catch (Exception e) {
log.error("Failed to get RabbitMQ metric value: {}", e.getMessage());
return 0;
}
}
@Override
public int getThreshold() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
return config != null ? config.getThreshold() : 10;
}
private void initializeConnection() {
try {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || config.getConnectionString() == null) {
log.warn("RabbitMQ configuration not found");
return;
}
connectionFactory = new ConnectionFactory();
connectionFactory.setUri(config.getConnectionString());
connectionFactory.setConnectionTimeout(10000);
connectionFactory.setRequestedHeartbeat(60);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
log.info("RabbitMQ connection established");
} catch (Exception e) {
log.error("Failed to initialize RabbitMQ connection: {}", e.getMessage());
closeConnection();
}
}
private int getQueueMessageCount(String queueName) throws IOException {
if (channel == null || !channel.isOpen()) {
initializeConnection();
}
if (channel != null && channel.isOpen()) {
try {
// Declare queue to ensure it exists
channel.queueDeclarePassive(queueName);
// Get message count
return channel.messageCount(queueName);
} catch (IOException e) {
log.error("Failed to get message count for queue {}: {}", queueName, e.getMessage());
throw e;
}
}
return 0;
}
private void closeConnection() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
log.warn("Error closing RabbitMQ connection: {}", e.getMessage());
} finally {
channel = null;
connection = null;
}
}
}
3. Kafka Scaler
// KafkaScaler.java
package com.company.keda.scaler;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.ScalerType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ExecutionException;
@Component
public class KafkaScaler implements EventScaler {
private static final Logger log = LoggerFactory.getLogger(KafkaScaler.class);
private final KedaConfig kedaConfig;
private AdminClient adminClient;
public KafkaScaler(KedaConfig kedaConfig) {
this.kedaConfig = kedaConfig;
initializeAdminClient();
}
@Override
public ScalerType getType() {
return ScalerType.KAFKA;
}
@Override
public boolean isAvailable() {
try {
if (adminClient == null) {
initializeAdminClient();
}
adminClient.listTopics().names().get(5, java.util.concurrent.TimeUnit.SECONDS);
return true;
} catch (Exception e) {
log.warn("Kafka availability check failed: {}", e.getMessage());
return false;
}
}
@Override
public int getMetricValue() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || !config.isEnabled()) {
return 0;
}
try {
String topic = config.getTopic();
String consumerGroup = config.getConsumerGroup();
if (topic == null || consumerGroup == null) {
log.error("Kafka topic or consumer group not configured");
return 0;
}
return getLagForConsumerGroup(topic, consumerGroup);
} catch (Exception e) {
log.error("Failed to get Kafka metric value: {}", e.getMessage());
return 0;
}
}
@Override
public int getThreshold() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
return config != null ? config.getThreshold() : 100;
}
private void initializeAdminClient() {
try {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || config.getConnectionString() == null) {
log.warn("Kafka configuration not found");
return;
}
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getConnectionString());
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 30000);
adminClient = AdminClient.create(props);
log.info("Kafka AdminClient initialized");
} catch (Exception e) {
log.error("Failed to initialize Kafka AdminClient: {}", e.getMessage());
closeAdminClient();
}
}
private int getLagForConsumerGroup(String topic, String consumerGroup)
throws ExecutionException, InterruptedException {
if (adminClient == null) {
initializeAdminClient();
}
if (adminClient != null) {
try {
// Get consumer group offsets
ListConsumerGroupOffsetsResult groupOffsets = adminClient
.listConsumerGroupOffsets(consumerGroup);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
groupOffsets.partitionsToOffsetAndMetadata().get();
// Get topic end offsets
Set<TopicPartition> topicPartitions = new HashSet<>();
for (TopicPartition tp : consumerOffsets.keySet()) {
if (tp.topic().equals(topic)) {
topicPartitions.add(tp);
}
}
if (topicPartitions.isEmpty()) {
log.warn("No partitions found for topic: {}", topic);
return 0;
}
ListOffsetsResult endOffsets = adminClient
.listOffsets(topicPartitions.stream()
.collect(HashMap::new, (m, tp) -> m.put(tp, null), HashMap::putAll));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsetsMap =
endOffsets.all().get();
// Calculate total lag
long totalLag = 0;
for (TopicPartition tp : topicPartitions) {
OffsetAndMetadata consumerOffset = consumerOffsets.get(tp);
ListOffsetsResult.ListOffsetsResultInfo endOffset = endOffsetsMap.get(tp);
if (consumerOffset != null && endOffset != null) {
long lag = endOffset.offset() - consumerOffset.offset();
totalLag += Math.max(0, lag);
}
}
return (int) totalLag;
} catch (Exception e) {
log.error("Failed to calculate Kafka lag: {}", e.getMessage());
throw e;
}
}
return 0;
}
private void closeAdminClient() {
if (adminClient != null) {
adminClient.close();
adminClient = null;
}
}
}
4. Redis Scaler
// RedisScaler.java
package com.company.keda.scaler;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.ScalerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
@Component
public class RedisScaler implements EventScaler {
private static final Logger log = LoggerFactory.getLogger(RedisScaler.class);
private final KedaConfig kedaConfig;
private final RedisTemplate<String, String> redisTemplate;
private final RedisConnectionFactory redisConnectionFactory;
public RedisScaler(KedaConfig kedaConfig,
RedisTemplate<String, String> redisTemplate,
RedisConnectionFactory redisConnectionFactory) {
this.kedaConfig = kedaConfig;
this.redisTemplate = redisTemplate;
this.redisConnectionFactory = redisConnectionFactory;
}
@Override
public ScalerType getType() {
return ScalerType.REDIS;
}
@Override
public boolean isAvailable() {
try {
return redisConnectionFactory.getConnection().ping() != null;
} catch (Exception e) {
log.warn("Redis availability check failed: {}", e.getMessage());
return false;
}
}
@Override
public int getMetricValue() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || !config.isEnabled()) {
return 0;
}
try {
String listName = config.getMetadata().get("listName");
if (listName == null) {
log.error("Redis list name not configured");
return 0;
}
// Get list length from Redis
Long length = redisTemplate.opsForList().size(listName);
return length != null ? length.intValue() : 0;
} catch (Exception e) {
log.error("Failed to get Redis metric value: {}", e.getMessage());
return 0;
}
}
@Override
public int getThreshold() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
return config != null ? config.getThreshold() : 50;
}
/**
* Get the number of items in multiple Redis lists (for complex scaling scenarios)
*/
public int getAggregatedListLength(List<String> listNames) {
if (listNames == null || listNames.isEmpty()) {
return 0;
}
try {
// Use Lua script for efficient multi-list length calculation
String luaScript =
"local total = 0 " +
"for i, key in ipairs(KEYS) do " +
" total = total + redis.call('LLEN', key) " +
"end " +
"return total";
RedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long totalLength = redisTemplate.execute(script, listNames);
return totalLength != null ? totalLength.intValue() : 0;
} catch (Exception e) {
log.error("Failed to get aggregated Redis list length: {}", e.getMessage());
return 0;
}
}
/**
* Get the number of items in a Redis stream
*/
public int getStreamLength(String streamName) {
try {
String luaScript =
"return redis.call('XLEN', KEYS[1])";
RedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long length = redisTemplate.execute(script, Collections.singletonList(streamName));
return length != null ? length.intValue() : 0;
} catch (Exception e) {
log.error("Failed to get Redis stream length: {}", e.getMessage());
return 0;
}
}
}
5. HTTP Scaler
// HttpScaler.java
package com.company.keda.scaler;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.ScalerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.Map;
@Component
public class HttpScaler implements EventScaler {
private static final Logger log = LoggerFactory.getLogger(HttpScaler.class);
private final KedaConfig kedaConfig;
private final RestTemplate restTemplate;
public HttpScaler(KedaConfig kedaConfig, RestTemplate restTemplate) {
this.kedaConfig = kedaConfig;
this.restTemplate = restTemplate;
}
@Override
public ScalerType getType() {
return ScalerType.HTTP;
}
@Override
public boolean isAvailable() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || config.getMetadata().get("url") == null) {
return false;
}
try {
String url = config.getMetadata().get("url");
ResponseEntity<String> response = restTemplate.getForEntity(url + "/health", String.class);
return response.getStatusCode().is2xxSuccessful();
} catch (Exception e) {
log.warn("HTTP scaler availability check failed: {}", e.getMessage());
return false;
}
}
@Override
public int getMetricValue() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
if (config == null || !config.isEnabled()) {
return 0;
}
try {
String url = config.getMetadata().get("url");
if (url == null) {
log.error("HTTP scaler URL not configured");
return 0;
}
// Call external HTTP endpoint to get metric value
ResponseEntity<MetricResponse> response = restTemplate.getForEntity(
url + "/metrics/queue-length", MetricResponse.class);
if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
return response.getBody().getValue();
}
return 0;
} catch (Exception e) {
log.error("Failed to get HTTP metric value: {}", e.getMessage());
return 0;
}
}
@Override
public int getThreshold() {
KedaConfig.ScalerConfig config = kedaConfig.getScalers().get(getType().getValue());
return config != null ? config.getThreshold() : 100;
}
/**
* Custom metric response DTO
*/
public static class MetricResponse {
private int value;
private String metricName;
private long timestamp;
// Getters and setters
public int getValue() { return value; }
public void setValue(int value) { this.value = value; }
public String getMetricName() { return metricName; }
public void setMetricName(String metricName) { this.metricName = metricName; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
}
KEDA Management Service
1. KEDA Service Implementation
```java
// KedaService.java
package com.company.keda.service;
import com.company.keda.config.KedaConfig;
import com.company.keda.model.*;
import com.company.keda.scaler.EventScaler;
import com.company.keda.scaler.ScalerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class KedaService {
private static final Logger log = LoggerFactory.getLogger(KedaService.class);
private final KedaConfig kedaConfig;
private final ScalerFactory scalerFactory;
private final CustomObjectsApi customObjectsApi;
public KedaService(KedaConfig kedaConfig,
ScalerFactory scalerFactory,
CustomObjectsApi customObjectsApi) {
this.kedaConfig = kedaConfig;
this.scalerFactory = scalerFactory;
this.customObjectsApi = customObjectsApi;
}
/**
* Create ScaledObject for a deployment
*/
public ScaledObject createScaledObject(String name, String namespace,
String deploymentName,
List<ScalerType> scalerTypes) {
log.info("Creating ScaledObject for deployment: {} in namespace: {}", deploymentName, namespace);
ScaledObject scaledObject = new ScaledObject();
// Set metadata
V1ObjectMeta metadata = new V1ObjectMeta();
metadata.setName(name);
metadata.setNamespace(namespace);
metadata.setLabels(Map.of(
"app.kubernetes.io/name", name,
"app.kubernetes.io/part-of", "keda-scaling"
));
scaledObject.setMetadata(metadata);
// Set spec
ScaledObject.Spec spec = new ScaledObject.Spec();
// Scale target reference
ScaledObject.ScaleTargetRef scaleTargetRef = new ScaledObject.ScaleTargetRef();
scaleTargetRef.setName(deploymentName);
spec.setScaleTargetRef(scaleTargetRef);
// Scaling configuration
spec.setPollingInterval(kedaConfig.getPollingInterval());
spec.setCooldownPeriod(kedaConfig.getCooldownPeriod());
spec.setMinReplicaCount(kedaConfig.getMinReplicas());
spec.setMaxReplicaCount(kedaConfig.getMaxReplicas());
// Triggers
List<ScaledObject.Trigger> triggers = createTriggers(scalerTypes);
spec.setTriggers(triggers);
scaledObject.setSpec(spec);
return scaledObject;
}
/**
* Apply ScaledObject to Kubernetes cluster
*/
public void applyScaledObject(ScaledObject scaledObject, String namespace) throws ApiException {
String name = scaledObject.getMetadata().getName();
String ns = namespace != null ? namespace : "default";
log.info("Applying ScaledObject: {} in namespace: {}", name, ns);
try {
// Check if ScaledObject already exists
Object existing = customObjectsApi.getNamespacedCustomObject(
"keda.sh", "v1alpha1", ns, "scaledobjects", name);
if (existing != null) {
// Update existing ScaledObject
customObjectsApi.replaceNamespacedCustomObject(
"keda.sh", "v1alpha1", ns, "scaledobjects", name, scaledObject);
log.info("Updated existing ScaledObject: {}", name);
}
} catch (ApiException e) {
if (e.getCode() == 404) {
// Create new ScaledObject
customObjectsApi.createNamespacedCustomObject(
"keda.sh", "v1alpha1", ns, "scaledobjects", scaledObject, null, null, null);
log.info("Created new ScaledObject: {}", name);
} else {
throw e;
}
}
}
/**
* Get current scaling metrics from all enabled scalers
*/
public Map<ScalerType, ScalingMetrics> getCurrentMetrics() {
Map<ScalerType, ScalingMetrics> metrics = new HashMap<>();
for (ScalerType scalerType : Scal
Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/
OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/
OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/
Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.
https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics
Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2
Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide
Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2
Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide
Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.
https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server
Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.