Leader election enables multiple instances of an application to coordinate and elect a single leader that performs specific tasks, ensuring high availability and preventing conflicts in distributed systems.
Core Concepts
What is Leader Election?
- Coordination mechanism for distributed systems
- Ensures only one instance acts as leader at a time
- Automatic failover when leader fails
- Based on Kubernetes coordination API (Leases, ConfigMaps, or Endpoints)
Key Components:
- Lease Resource: Kubernetes resource used for leader election locks
- Leader Identity: Unique identifier for each candidate
- Heartbeat Mechanism: Regular updates to maintain leadership
- Watch Mechanism: Monitoring for leadership changes
Dependencies and Setup
Maven Dependencies
<properties>
<kubernetes-client.version>6.7.2</kubernetes-client.version>
<spring-boot.version>3.1.0</spring-boot.version>
<micrometer.version>1.11.0</micrometer.version>
</properties>
<dependencies>
<!-- Fabric8 Kubernetes Client -->
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>6.0.11</version>
</dependency>
</dependencies>
Configuration Classes
@Configuration
@ConfigurationProperties(prefix = "leader.election")
@Data
public class LeaderElectionConfig {
private boolean enabled = true;
private String leaseName = "application-leader";
private String leaseNamespace;
private String identity;
private Duration leaseDuration = Duration.ofSeconds(15);
private Duration renewDeadline = Duration.ofSeconds(10);
private Duration retryPeriod = Duration.ofSeconds(2);
private Duration heartbeatInterval = Duration.ofSeconds(5);
@PostConstruct
public void init() {
if (identity == null) {
// Generate unique identity based on hostname and timestamp
identity = generateIdentity();
}
}
private String generateIdentity() {
try {
String hostname = InetAddress.getLocalHost().getHostName();
return hostname + "-" + System.currentTimeMillis();
} catch (UnknownHostException e) {
return "unknown-" + UUID.randomUUID().toString().substring(0, 8);
}
}
}
@Configuration
@EnableConfigurationProperties(LeaderElectionConfig.class)
@EnableRetry
@EnableAspectJAutoProxy
public class LeaderElectionConfiguration {
@Bean
@ConditionalOnMissingBean
public KubernetesClient kubernetesClient() {
return new DefaultKubernetesClient();
}
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}
}
Core Leader Election Implementation
1. Lease Resource Model
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class LeaderElectionRecord {
private String holderIdentity;
private Long leaseDurationSeconds;
private Long acquireTime;
private Long renewTime;
private Integer leaderTransitions;
private String version;
public static LeaderElectionRecord empty() {
return LeaderElectionRecord.builder()
.holderIdentity("")
.leaseDurationSeconds(0L)
.acquireTime(0L)
.renewTime(0L)
.leaderTransitions(0)
.build();
}
public boolean isExpired(long currentTimeMillis) {
if (renewTime == null || leaseDurationSeconds == null) {
return true;
}
return currentTimeMillis > (renewTime + (leaseDurationSeconds * 1000));
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class LeaderElectionState {
private boolean isLeader;
private String leaderIdentity;
private long lastObservationTime;
private long leaseExpiryTime;
private int leaderTransitions;
private LeaderElectionStatus status;
public enum LeaderElectionStatus {
LEADER, FOLLOWER, UNKNOWN, ELECTION_IN_PROGRESS
}
}
2. Lease Management Service
@Service
@Slf4j
public class LeaseManagementService {
private final KubernetesClient kubernetesClient;
private final LeaderElectionConfig config;
private final MeterRegistry meterRegistry;
private static final String LEASE_API_GROUP = "coordination.k8s.io";
private static final String LEASE_API_VERSION = "v1";
private static final String LEASE_KIND = "Lease";
public LeaseManagementService(KubernetesClient kubernetesClient,
LeaderElectionConfig config,
MeterRegistry meterRegistry) {
this.kubernetesClient = kubernetesClient;
this.config = config;
this.meterRegistry = meterRegistry;
}
public Lease getOrCreateLease() {
String namespace = getNamespace();
String leaseName = config.getLeaseName();
try {
Lease existingLease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (existingLease != null) {
log.debug("Found existing lease: {}/{}", namespace, leaseName);
return existingLease;
}
// Create new lease
log.info("Creating new lease: {}/{}", namespace, leaseName);
Lease newLease = createLeaseResource();
return kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.create(newLease);
} catch (Exception e) {
log.error("Failed to get or create lease: {}/{}", namespace, leaseName, e);
throw new LeaderElectionException("Lease management failed", e);
}
}
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public boolean acquireLease() {
String namespace = getNamespace();
String leaseName = config.getLeaseName();
try {
Lease lease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (lease == null) {
log.warn("Lease not found: {}/{}", namespace, leaseName);
return false;
}
LeaderElectionRecord currentRecord = extractLeaderRecord(lease);
long now = System.currentTimeMillis();
// Check if we can acquire the lease
if (currentRecord.getHolderIdentity().isEmpty() ||
currentRecord.isExpired(now) ||
currentRecord.getHolderIdentity().equals(config.getIdentity())) {
// Acquire or renew the lease
LeaderElectionRecord newRecord = LeaderElectionRecord.builder()
.holderIdentity(config.getIdentity())
.leaseDurationSeconds(config.getLeaseDuration().getSeconds())
.acquireTime(currentRecord.getHolderIdentity().equals(config.getIdentity()) ?
currentRecord.getAcquireTime() : now)
.renewTime(now)
.leaderTransitions(currentRecord.getLeaderTransitions() +
(currentRecord.getHolderIdentity().equals(config.getIdentity()) ? 0 : 1))
.build();
updateLease(lease, newRecord);
log.info("Successfully acquired lease: {}/{}", namespace, leaseName);
meterRegistry.counter("leader.election.acquire.success").increment();
return true;
}
log.debug("Lease already held by: {}", currentRecord.getHolderIdentity());
meterRegistry.counter("leader.election.acquire.failure").increment();
return false;
} catch (Exception e) {
log.error("Failed to acquire lease: {}/{}", namespace, leaseName, e);
meterRegistry.counter("leader.election.acquire.error").increment();
throw new LeaderElectionException("Lease acquisition failed", e);
}
}
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public boolean renewLease() {
String namespace = getNamespace();
String leaseName = config.getLeaseName();
try {
Lease lease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (lease == null) {
log.error("Lease not found for renewal: {}/{}", namespace, leaseName);
return false;
}
LeaderElectionRecord currentRecord = extractLeaderRecord(lease);
// Verify we still hold the lease
if (!currentRecord.getHolderIdentity().equals(config.getIdentity())) {
log.warn("Cannot renew lease - not current leader. Current leader: {}",
currentRecord.getHolderIdentity());
return false;
}
// Renew the lease
LeaderElectionRecord renewedRecord = LeaderElectionRecord.builder()
.holderIdentity(config.getIdentity())
.leaseDurationSeconds(config.getLeaseDuration().getSeconds())
.acquireTime(currentRecord.getAcquireTime())
.renewTime(System.currentTimeMillis())
.leaderTransitions(currentRecord.getLeaderTransitions())
.build();
updateLease(lease, renewedRecord);
log.debug("Successfully renewed lease: {}/{}", namespace, leaseName);
meterRegistry.counter("leader.election.renew.success").increment();
return true;
} catch (Exception e) {
log.error("Failed to renew lease: {}/{}", namespace, leaseName, e);
meterRegistry.counter("leader.election.renew.error").increment();
throw new LeaderElectionException("Lease renewal failed", e);
}
}
public LeaderElectionState getCurrentLeaderState() {
String namespace = getNamespace();
String leaseName = config.getLeaseName();
try {
Lease lease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (lease == null) {
return LeaderElectionState.builder()
.isLeader(false)
.leaderIdentity("")
.status(LeaderElectionState.LeaderElectionStatus.UNKNOWN)
.lastObservationTime(System.currentTimeMillis())
.build();
}
LeaderElectionRecord record = extractLeaderRecord(lease);
long now = System.currentTimeMillis();
boolean isLeader = record.getHolderIdentity().equals(config.getIdentity());
boolean isExpired = record.isExpired(now);
LeaderElectionState.LeaderElectionStatus status;
if (isLeader && !isExpired) {
status = LeaderElectionState.LeaderElectionStatus.LEADER;
} else if (!isLeader && !isExpired) {
status = LeaderElectionState.LeaderElectionStatus.FOLLOWER;
} else {
status = LeaderElectionState.LeaderElectionStatus.ELECTION_IN_PROGRESS;
}
return LeaderElectionState.builder()
.isLeader(isLeader && !isExpired)
.leaderIdentity(record.getHolderIdentity())
.lastObservationTime(now)
.leaseExpiryTime(record.getRenewTime() + (record.getLeaseDurationSeconds() * 1000))
.leaderTransitions(record.getLeaderTransitions())
.status(status)
.build();
} catch (Exception e) {
log.error("Failed to get current leader state", e);
return LeaderElectionState.builder()
.status(LeaderElectionState.LeaderElectionStatus.UNKNOWN)
.lastObservationTime(System.currentTimeMillis())
.build();
}
}
public void releaseLease() {
String namespace = getNamespace();
String leaseName = config.getLeaseName();
try {
Lease lease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (lease != null) {
LeaderElectionRecord currentRecord = extractLeaderRecord(lease);
// Only release if we're the current holder
if (currentRecord.getHolderIdentity().equals(config.getIdentity())) {
LeaderElectionRecord releasedRecord = LeaderElectionRecord.builder()
.holderIdentity("")
.leaseDurationSeconds(0L)
.acquireTime(0L)
.renewTime(System.currentTimeMillis())
.leaderTransitions(currentRecord.getLeaderTransitions())
.build();
updateLease(lease, releasedRecord);
log.info("Released lease: {}/{}", namespace, leaseName);
meterRegistry.counter("leader.election.release").increment();
}
}
} catch (Exception e) {
log.error("Failed to release lease: {}/{}", namespace, leaseName, e);
}
}
private Lease createLeaseResource() {
return new LeaseBuilder()
.withNewMetadata()
.withName(config.getLeaseName())
.withNamespace(getNamespace())
.endMetadata()
.withNewSpec()
.withHolderIdentity("")
.withLeaseDurationSeconds(0L)
.withAcquireTime("0")
.withRenewTime("0")
.withLeaseTransitions(0)
.endSpec()
.build();
}
private void updateLease(Lease lease, LeaderElectionRecord record) {
lease.getSpec().setHolderIdentity(record.getHolderIdentity());
lease.getSpec().setLeaseDurationSeconds(record.getLeaseDurationSeconds().intValue());
lease.getSpec().setAcquireTime(toMicroTime(record.getAcquireTime()));
lease.getSpec().setRenewTime(toMicroTime(record.getRenewTime()));
lease.getSpec().setLeaseTransitions(record.getLeaderTransitions());
kubernetesClient.resources(Lease.class)
.inNamespace(getNamespace())
.withName(config.getLeaseName())
.patch(lease);
}
private LeaderElectionRecord extractLeaderRecord(Lease lease) {
if (lease == null || lease.getSpec() == null) {
return LeaderElectionRecord.empty();
}
return LeaderElectionRecord.builder()
.holderIdentity(lease.getSpec().getHolderIdentity() != null ?
lease.getSpec().getHolderIdentity() : "")
.leaseDurationSeconds(lease.getSpec().getLeaseDurationSeconds() != null ?
lease.getSpec().getLeaseDurationSeconds().longValue() : 0L)
.acquireTime(fromMicroTime(lease.getSpec().getAcquireTime()))
.renewTime(fromMicroTime(lease.getSpec().getRenewTime()))
.leaderTransitions(lease.getSpec().getLeaseTransitions() != null ?
lease.getSpec().getLeaseTransitions() : 0)
.build();
}
private String toMicroTime(long timeMillis) {
return String.valueOf(timeMillis * 1000);
}
private long fromMicroTime(String microTime) {
if (microTime == null || microTime.isEmpty()) {
return 0L;
}
try {
return Long.parseLong(microTime) / 1000;
} catch (NumberFormatException e) {
return 0L;
}
}
private String getNamespace() {
return config.getLeaseNamespace() != null ?
config.getLeaseNamespace() :
kubernetesClient.getNamespace();
}
}
3. Leader Election Service
@Service
@Slf4j
public class LeaderElectionService {
private final LeaseManagementService leaseService;
private final LeaderElectionConfig config;
private final LeaderTaskManager taskManager;
private final MeterRegistry meterRegistry;
private volatile boolean isRunning = false;
private volatile boolean isLeader = false;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> electionTask;
private ScheduledFuture<?> heartbeatTask;
private final Object lock = new Object();
public LeaderElectionService(LeaseManagementService leaseService,
LeaderElectionConfig config,
LeaderTaskManager taskManager,
MeterRegistry meterRegistry) {
this.leaseService = leaseService;
this.config = config;
this.taskManager = taskManager;
this.meterRegistry = meterRegistry;
}
@EventListener(ContextRefreshedEvent.class)
public void startLeaderElection() {
if (!config.isEnabled()) {
log.info("Leader election is disabled");
return;
}
synchronized (lock) {
if (isRunning) {
log.warn("Leader election is already running");
return;
}
log.info("Starting leader election with identity: {}", config.getIdentity());
isRunning = true;
// Initialize lease
try {
leaseService.getOrCreateLease();
} catch (Exception e) {
log.error("Failed to initialize lease, retrying in background", e);
}
// Start election scheduler
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("leader-election-%d")
.setDaemon(true)
.build());
// Start election process
electionTask = scheduler.scheduleWithFixedDelay(
this::runElectionCycle,
0,
config.getRetryPeriod().toMillis(),
TimeUnit.MILLISECONDS
);
log.info("Leader election started successfully");
}
}
@PreDestroy
public void stopLeaderElection() {
synchronized (lock) {
if (!isRunning) {
return;
}
log.info("Stopping leader election");
isRunning = false;
// Stop tasks
if (electionTask != null) {
electionTask.cancel(true);
}
if (heartbeatTask != null) {
heartbeatTask.cancel(true);
}
// Release leadership
if (isLeader) {
relinquishLeadership();
}
// Shutdown scheduler
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
log.info("Leader election stopped");
}
}
private void runElectionCycle() {
if (!isRunning) {
return;
}
try {
LeaderElectionState currentState = leaseService.getCurrentLeaderState();
meterRegistry.gauge("leader.election.state",
currentState.getStatus().ordinal());
switch (currentState.getStatus()) {
case LEADER:
handleLeaderState();
break;
case FOLLOWER:
handleFollowerState(currentState.getLeaderIdentity());
break;
case ELECTION_IN_PROGRESS:
case UNKNOWN:
handleElectionState();
break;
}
} catch (Exception e) {
log.error("Error in leader election cycle", e);
meterRegistry.counter("leader.election.cycle.error").increment();
}
}
private void handleLeaderState() {
if (!isLeader) {
// We just became leader
log.info("Acquired leadership: {}", config.getIdentity());
isLeader = true;
startLeadership();
meterRegistry.counter("leader.election.become.leader").increment();
}
// Renew our lease
boolean renewed = leaseService.renewLease();
if (!renewed) {
log.warn("Failed to renew lease, leadership may be lost");
relinquishLeadership();
}
}
private void handleFollowerState(String leaderIdentity) {
if (isLeader) {
// We lost leadership
log.warn("Lost leadership to: {}", leaderIdentity);
relinquishLeadership();
meterRegistry.counter("leader.election.lost.leadership").increment();
}
// Just continue as follower
log.debug("Following leader: {}", leaderIdentity);
}
private void handleElectionState() {
if (isLeader) {
// Our leadership expired
log.warn("Leadership expired");
relinquishLeadership();
meterRegistry.counter("leader.election.leadership.expired").increment();
}
// Try to acquire leadership
log.debug("Attempting to acquire leadership");
boolean acquired = leaseService.acquireLease();
if (acquired) {
log.info("Successfully acquired leadership");
isLeader = true;
startLeadership();
meterRegistry.counter("leader.election.acquire.success").increment();
}
}
private void startLeadership() {
log.info("Starting leadership tasks");
// Start heartbeat task
heartbeatTask = scheduler.scheduleWithFixedDelay(
this::sendHeartbeat,
0,
config.getHeartbeatInterval().toMillis(),
TimeUnit.MILLISECONDS
);
// Start leader tasks
taskManager.onLeadershipAcquired();
meterRegistry.gauge("leader.election.is_leader", 1);
}
private void relinquishLeadership() {
log.info("Relinquishing leadership");
isLeader = false;
// Stop heartbeat task
if (heartbeatTask != null) {
heartbeatTask.cancel(true);
heartbeatTask = null;
}
// Stop leader tasks
taskManager.onLeadershipLost();
// Release lease
try {
leaseService.releaseLease();
} catch (Exception e) {
log.error("Failed to release lease during relinquishment", e);
}
meterRegistry.gauge("leader.election.is_leader", 0);
}
private void sendHeartbeat() {
if (!isLeader || !isRunning) {
return;
}
try {
// Perform any periodic leader tasks
taskManager.onLeaderHeartbeat();
meterRegistry.counter("leader.election.heartbeat").increment();
} catch (Exception e) {
log.error("Error in leader heartbeat", e);
meterRegistry.counter("leader.election.heartbeat.error").increment();
}
}
public boolean isLeader() {
return isLeader && isRunning;
}
public LeaderElectionState getCurrentState() {
return leaseService.getCurrentLeaderState();
}
public String getLeaderIdentity() {
return getCurrentState().getLeaderIdentity();
}
}
4. Leader Task Manager
@Service
@Slf4j
public class LeaderTaskManager {
private final Map<String, LeaderTask> tasks = new ConcurrentHashMap<>();
private final ScheduledExecutorService taskScheduler;
private volatile boolean isLeading = false;
public LeaderTaskManager() {
this.taskScheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("leader-tasks-%d")
.setDaemon(true)
.build());
}
public void registerTask(String taskName, LeaderTask task) {
tasks.put(taskName, task);
log.info("Registered leader task: {}", taskName);
}
public void onLeadershipAcquired() {
log.info("Leadership acquired - starting leader tasks");
isLeading = true;
tasks.forEach((name, task) -> {
try {
log.info("Starting leader task: {}", name);
task.onLeadershipAcquired();
} catch (Exception e) {
log.error("Failed to start leader task: {}", name, e);
}
});
}
public void onLeadershipLost() {
log.info("Leadership lost - stopping leader tasks");
isLeading = false;
tasks.forEach((name, task) -> {
try {
log.info("Stopping leader task: {}", name);
task.onLeadershipLost();
} catch (Exception e) {
log.error("Failed to stop leader task: {}", name, e);
}
});
}
public void onLeaderHeartbeat() {
if (!isLeading) {
return;
}
tasks.forEach((name, task) -> {
try {
task.onLeaderHeartbeat();
} catch (Exception e) {
log.error("Error in leader task heartbeat: {}", name, e);
}
});
}
public void executeLeaderTask(String taskName, Runnable task) {
if (!isLeading) {
log.debug("Skipping task execution - not leader: {}", taskName);
return;
}
try {
task.run();
} catch (Exception e) {
log.error("Error executing leader task: {}", taskName, e);
}
}
public ScheduledFuture<?> scheduleLeaderTask(String taskName, Runnable task,
long delay, TimeUnit unit) {
return taskScheduler.schedule(() -> executeLeaderTask(taskName, task), delay, unit);
}
public ScheduledFuture<?> scheduleLeaderTaskAtFixedRate(String taskName, Runnable task,
long initialDelay, long period,
TimeUnit unit) {
return taskScheduler.scheduleAtFixedRate(
() -> executeLeaderTask(taskName, task), initialDelay, period, unit);
}
@PreDestroy
public void shutdown() {
taskScheduler.shutdown();
try {
if (!taskScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
taskScheduler.shutdownNow();
}
} catch (InterruptedException e) {
taskScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public interface LeaderTask {
void onLeadershipAcquired();
void onLeadershipLost();
void onLeaderHeartbeat();
}
5. Example Leader Tasks
@Component
@Slf4j
public class DatabaseCleanupTask implements LeaderTask {
private final ScheduledFuture<?> cleanupTask;
public DatabaseCleanupTask(LeaderTaskManager taskManager) {
taskManager.registerTask("database-cleanup", this);
this.cleanupTask = null; // Will be initialized when leadership is acquired
}
@Override
public void onLeadershipAcquired() {
log.info("Starting database cleanup task");
// Schedule periodic cleanup
// cleanupTask = taskManager.scheduleLeaderTaskAtFixedRate(
// "database-cleanup",
// this::performCleanup,
// 0, 30, TimeUnit.MINUTES
// );
}
@Override
public void onLeadershipLost() {
log.info("Stopping database cleanup task");
if (cleanupTask != null && !cleanupTask.isCancelled()) {
cleanupTask.cancel(true);
}
}
@Override
public void onLeaderHeartbeat() {
// Optional: Perform lightweight checks during heartbeat
log.debug("Database cleanup task heartbeat");
}
private void performCleanup() {
log.info("Performing database cleanup");
// Implementation for database cleanup
}
}
@Component
@Slf4j
public class CacheWarmupTask implements LeaderTask {
private final LeaderTaskManager taskManager;
private ScheduledFuture<?> warmupTask;
public CacheWarmupTask(LeaderTaskManager taskManager) {
this.taskManager = taskManager;
taskManager.registerTask("cache-warmup", this);
}
@Override
public void onLeadershipAcquired() {
log.info("Starting cache warmup task");
// Schedule cache warmup every hour
warmupTask = taskManager.scheduleLeaderTaskAtFixedRate(
"cache-warmup",
this::warmupCache,
0, 1, TimeUnit.HOURS
);
}
@Override
public void onLeadershipLost() {
log.info("Stopping cache warmup task");
if (warmupTask != null && !warmupTask.isCancelled()) {
warmupTask.cancel(true);
}
}
@Override
public void onLeaderHeartbeat() {
// Monitor cache health
log.debug("Cache warmup task heartbeat");
}
private void warmupCache() {
log.info("Warming up cache");
// Implementation for cache warmup
try {
// Simulate cache warming
Thread.sleep(5000);
log.info("Cache warmup completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Cache warmup interrupted");
}
}
}
Advanced Features
1. Leader Election with Callbacks
@Service
@Slf4j
public class CallbackLeaderElectionService {
private final LeaderElectionService leaderElectionService;
private final List<LeadershipCallback> callbacks = new CopyOnWriteArrayList<>();
public CallbackLeaderElectionService(LeaderElectionService leaderElectionService) {
this.leaderElectionService = leaderElectionService;
setupCallbacks();
}
public void registerCallback(LeadershipCallback callback) {
callbacks.add(callback);
log.info("Registered leadership callback: {}", callback.getClass().getSimpleName());
}
private void setupCallbacks() {
// Monitor leadership changes
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("leadership-monitor-%d")
.setDaemon(true)
.build());
monitor.scheduleAtFixedRate(this::checkLeadershipChanges, 1, 1, TimeUnit.SECONDS);
}
private void checkLeadershipChanges() {
LeaderElectionState state = leaderElectionService.getCurrentState();
callbacks.forEach(callback -> {
try {
callback.onLeadershipStateChange(state);
} catch (Exception e) {
log.error("Error in leadership callback", e);
}
});
}
public interface LeadershipCallback {
void onLeadershipStateChange(LeaderElectionState state);
}
}
@Component
@Slf4j
public class MetricsLeadershipCallback implements CallbackLeaderElectionService.LeadershipCallback {
private final MeterRegistry meterRegistry;
private volatile String lastLeader = "";
public MetricsLeadershipCallback(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Override
public void onLeadershipStateChange(LeaderElectionState state) {
// Track leader changes
if (!lastLeader.equals(state.getLeaderIdentity())) {
log.info("Leader changed from {} to {}", lastLeader, state.getLeaderIdentity());
meterRegistry.counter("leader.election.leader.changes").increment();
lastLeader = state.getLeaderIdentity();
}
// Update metrics
meterRegistry.gauge("leader.election.lease.expiry",
state.getLeaseExpiryTime() - System.currentTimeMillis());
}
}
2. Leader Election with Health Checks
@Component
@Slf4j
public class LeaderElectionHealthIndicator implements HealthIndicator {
private final LeaderElectionService leaderElectionService;
private final LeaseManagementService leaseService;
public LeaderElectionHealthIndicator(LeaderElectionService leaderElectionService,
LeaseManagementService leaseService) {
this.leaderElectionService = leaderElectionService;
this.leaseService = leaseService;
}
@Override
public Health health() {
try {
LeaderElectionState state = leaderElectionService.getCurrentState();
Health.Builder healthBuilder = Health.unknown();
switch (state.getStatus()) {
case LEADER:
healthBuilder.up()
.withDetail("role", "leader")
.withDetail("identity", state.getLeaderIdentity())
.withDetail("leaseExpiresIn",
state.getLeaseExpiryTime() - System.currentTimeMillis())
.withDetail("transitions", state.getLeaderTransitions());
break;
case FOLLOWER:
healthBuilder.up()
.withDetail("role", "follower")
.withDetail("leader", state.getLeaderIdentity())
.withDetail("transitions", state.getLeaderTransitions());
break;
case ELECTION_IN_PROGRESS:
healthBuilder.down()
.withDetail("role", "election-in-progress")
.withDetail("message", "Leadership election in progress");
break;
case UNKNOWN:
healthBuilder.down()
.withDetail("role", "unknown")
.withDetail("message", "Unable to determine leadership state");
break;
}
return healthBuilder.build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}
3. Distributed Lock Service (Alternative Approach)
@Service
@Slf4j
public class DistributedLockService {
private final KubernetesClient kubernetesClient;
private final String namespace;
private final Map<String, Lease> activeLocks = new ConcurrentHashMap<>();
public DistributedLockService(KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
this.namespace = kubernetesClient.getNamespace();
}
public boolean acquireLock(String lockName, String holderIdentity, Duration ttl) {
String leaseName = "lock-" + lockName;
try {
Lease lease = kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.get();
if (lease == null) {
// Create new lock
lease = createLock(leaseName, holderIdentity, ttl);
activeLocks.put(lockName, lease);
log.info("Acquired lock: {} for {}", lockName, holderIdentity);
return true;
}
// Check if lock is available
if (lease.getSpec().getHolderIdentity() == null ||
lease.getSpec().getHolderIdentity().isEmpty() ||
isLockExpired(lease)) {
// Acquire the lock
lease.getSpec().setHolderIdentity(holderIdentity);
lease.getSpec().setLeaseDurationSeconds((int) ttl.getSeconds());
lease.getSpec().setRenewTime(toMicroTime(System.currentTimeMillis()));
kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(leaseName)
.patch(lease);
activeLocks.put(lockName, lease);
log.info("Acquired lock: {} for {}", lockName, holderIdentity);
return true;
}
log.debug("Lock {} already held by {}", lockName, lease.getSpec().getHolderIdentity());
return false;
} catch (Exception e) {
log.error("Failed to acquire lock: {}", lockName, e);
return false;
}
}
public boolean renewLock(String lockName, String holderIdentity, Duration ttl) {
Lease lease = activeLocks.get(lockName);
if (lease == null) {
return false;
}
try {
// Verify we still hold the lock
if (!holderIdentity.equals(lease.getSpec().getHolderIdentity())) {
log.warn("Cannot renew lock - not current holder: {}", lockName);
return false;
}
// Renew the lock
lease.getSpec().setLeaseDurationSeconds((int) ttl.getSeconds());
lease.getSpec().setRenewTime(toMicroTime(System.currentTimeMillis()));
kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(lease.getMetadata().getName())
.patch(lease);
log.debug("Renewed lock: {} for {}", lockName, holderIdentity);
return true;
} catch (Exception e) {
log.error("Failed to renew lock: {}", lockName, e);
return false;
}
}
public void releaseLock(String lockName, String holderIdentity) {
Lease lease = activeLocks.get(lockName);
if (lease == null) {
return;
}
try {
// Only release if we're the current holder
if (holderIdentity.equals(lease.getSpec().getHolderIdentity())) {
lease.getSpec().setHolderIdentity("");
lease.getSpec().setLeaseDurationSeconds(0);
kubernetesClient.resources(Lease.class)
.inNamespace(namespace)
.withName(lease.getMetadata().getName())
.patch(lease);
activeLocks.remove(lockName);
log.info("Released lock: {}", lockName);
}
} catch (Exception e) {
log.error("Failed to release lock: {}", lockName, e);
}
}
private Lease createLock(String leaseName, String holderIdentity, Duration ttl) {
return new LeaseBuilder()
.withNewMetadata()
.withName(leaseName)
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withHolderIdentity(holderIdentity)
.withLeaseDurationSeconds((int) ttl.getSeconds())
.withAcquireTime(toMicroTime(System.currentTimeMillis()))
.withRenewTime(toMicroTime(System.currentTimeMillis()))
.withLeaseTransitions(1)
.endSpec()
.build();
}
private boolean isLockExpired(Lease lease) {
if (lease.getSpec().getRenewTime() == null) {
return true;
}
long renewTime = fromMicroTime(lease.getSpec().getRenewTime());
long leaseDuration = lease.getSpec().getLeaseDurationSeconds() * 1000L;
return System.currentTimeMillis() > (renewTime + leaseDuration);
}
private String toMicroTime(long timeMillis) {
return String.valueOf(timeMillis * 1000);
}
private long fromMicroTime(String microTime) {
try {
return Long.parseLong(microTime) / 1000;
} catch (NumberFormatException e) {
return 0L;
}
}
}
REST Controller for Leadership Management
@RestController
@RequestMapping("/api/leadership")
@Slf4j
public class LeadershipController {
private final LeaderElectionService leaderElectionService;
private final LeaseManagementService leaseService;
private final LeaderTaskManager taskManager;
public LeadershipController(LeaderElectionService leaderElectionService,
LeaseManagementService leaseService,
LeaderTaskManager taskManager) {
this.leaderElectionService = leaderElectionService;
this.leaseService = leaseService;
this.taskManager = taskManager;
}
@GetMapping("/status")
public ResponseEntity<LeadershipStatus> getLeadershipStatus() {
LeaderElectionState state = leaderElectionService.getCurrentState();
LeadershipStatus status = LeadershipStatus.builder()
.isLeader(leaderElectionService.isLeader())
.leaderIdentity(state.getLeaderIdentity())
.status(state.getStatus().name())
.leaseExpiryTime(state.getLeaseExpiryTime())
.lastObservationTime(state.getLastObservationTime())
.leaderTransitions(state.getLeaderTransitions())
.build();
return ResponseEntity.ok(status);
}
@PostMapping("/acquire")
public ResponseEntity<LeadershipResponse> forceAcquireLeadership() {
try {
boolean acquired = leaseService.acquireLease();
LeadershipResponse response = LeadershipResponse.builder()
.success(acquired)
.message(acquired ? "Leadership acquired" : "Failed to acquire leadership")
.timestamp(Instant.now())
.build();
return acquired ?
ResponseEntity.ok(response) :
ResponseEntity.status(HttpStatus.CONFLICT).body(response);
} catch (Exception e) {
log.error("Failed to force acquire leadership", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(LeadershipResponse.error("Acquisition failed: " + e.getMessage()));
}
}
@PostMapping("/release")
public ResponseEntity<LeadershipResponse> releaseLeadership() {
try {
leaseService.releaseLease();
LeadershipResponse response = LeadershipResponse.builder()
.success(true)
.message("Leadership released")
.timestamp(Instant.now())
.build();
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to release leadership", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(LeadershipResponse.error("Release failed: " + e.getMessage()));
}
}
@GetMapping("/tasks")
public ResponseEntity<List<String>> getLeaderTasks() {
// This would return information about registered leader tasks
return ResponseEntity.ok(List.of("database-cleanup", "cache-warmup"));
}
@PostMapping("/tasks/{taskName}/execute")
public ResponseEntity<TaskExecutionResponse> executeLeaderTask(@PathVariable String taskName) {
if (!leaderElectionService.isLeader()) {
return ResponseEntity.status(HttpStatus.FORBIDDEN)
.body(TaskExecutionResponse.error("Not the current leader"));
}
try {
taskManager.executeLeaderTask(taskName, () -> {
log.info("Executing leader task: {}", taskName);
// Task execution would happen here
});
return ResponseEntity.ok(TaskExecutionResponse.success("Task execution scheduled"));
} catch (Exception e) {
log.error("Failed to execute leader task: {}", taskName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(TaskExecutionResponse.error("Task execution failed"));
}
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
class LeadershipStatus {
private boolean isLeader;
private String leaderIdentity;
private String status;
private long leaseExpiryTime;
private long lastObservationTime;
private int leaderTransitions;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
class LeadershipResponse {
private boolean success;
private String message;
private Instant timestamp;
public static LeadershipResponse error(String message) {
return LeadershipResponse.builder()
.success(false)
.message(message)
.timestamp(Instant.now())
.build();
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
class TaskExecutionResponse {
private boolean success;
private String message;
private Instant timestamp;
public static TaskExecutionResponse success(String message) {
return TaskExecutionResponse.builder()
.success(true)
.message(message)
.timestamp(Instant.now())
.build();
}
public static TaskExecutionResponse error(String message) {
return TaskExecutionResponse.builder()
.success(false)
.message(message)
.timestamp(Instant.now())
.build();
}
}
Custom Exceptions
public class LeaderElectionException extends RuntimeException {
public LeaderElectionException(String message) {
super(message);
}
public LeaderElectionException(String message, Throwable cause) {
super(message, cause);
}
}
public class LeaseAcquisitionException extends LeaderElectionException {
public LeaseAcquisitionException(String message) {
super(message);
}
public LeaseAcquisitionException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class LeaderElectionExceptionHandler {
@ExceptionHandler(LeaderElectionException.class)
public ResponseEntity<ErrorResponse> handleLeaderElectionException(LeaderElectionException e) {
ErrorResponse error = new ErrorResponse("LEADER_ELECTION_ERROR", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
@ExceptionHandler(LeaseAcquisitionException.class)
public ResponseEntity<ErrorResponse> handleLeaseAcquisitionException(LeaseAcquisitionException e) {
ErrorResponse error = new ErrorResponse("LEASE_ACQUISITION_ERROR", e.getMessage());
return ResponseEntity.status(HttpStatus.CONFLICT).body(error);
}
}
Configuration
# application.yml
leader:
election:
enabled: true
lease-name: "my-app-leader"
lease-namespace: "${KUBERNETES_NAMESPACE:default}"
identity: "${HOSTNAME:unknown}-${random.uuid}"
lease-duration: 15s
renew-deadline: 10s
retry-period: 2s
heartbeat-interval: 5s
kubernetes:
master-url: "${KUBERNETES_MASTER:https://kubernetes.default.svc}"
namespace: "${KUBERNETES_NAMESPACE:default}"
trust-certs: true
management:
endpoints:
web:
exposure:
include: health,info,metrics
endpoint:
health:
show-details: always
logging:
level:
com.example.leaderelection: DEBUG
Deployment Configuration
# k8s-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: my-app labels: app: my-app spec: replicas: 3 selector: matchLabels: app: my-app template: metadata: labels: app: my-app spec: serviceAccountName: my-app-service-account containers: - name: my-app image: my-app:latest ports: - containerPort: 8080 env: - name: KUBERNETES_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: HOSTNAME valueFrom: fieldRef: fieldPath: metadata.name livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 5 periodSeconds: 5 --- apiVersion: v1 kind: ServiceAccount metadata: name: my-app-service-account --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: default name: lease-access rules: - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["get", "create", "update", "patch", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: my-app-lease-access namespace: default roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: lease-access subjects: - kind: ServiceAccount name: my-app-service-account namespace: default
Best Practices
- Proper RBAC Configuration: Ensure service account has lease permissions
- Resource Cleanup: Always release leases on shutdown
- Health Monitoring: Implement comprehensive health checks
- Graceful Degradation: Handle leadership loss gracefully
- Testing: Test leader election in multi-instance scenarios
- Monitoring: Track leadership transitions and lease operations
@Component
public class LeaderElectionBestPractices {
@EventListener(ContextClosedEvent.class)
public void onShutdown() {
// Ensure clean shutdown and lease release
}
@Scheduled(fixedRate = 30000)
public void monitorLeaderElectionHealth() {
// Regular health monitoring of leader election process
}
}
Conclusion
Leader Election in Kubernetes with Java provides:
- High Availability: Automatic failover between application instances
- Distributed Coordination: Safe execution of singleton tasks
- Kubernetes Native: Leverages Kubernetes coordination API
- Production Ready: Comprehensive error handling and monitoring
- Flexible: Support for various leadership patterns and tasks
This implementation enables robust leader election capabilities in distributed Java applications running on Kubernetes, ensuring that critical tasks are performed by only one instance while maintaining high availability through automatic failover.
Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/
OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/
OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/
Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.
https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics
Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2
Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide
Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2
Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide
Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.
https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server
Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.