Leader Election in Kubernetes in Java

Overview

Leader election in Kubernetes allows multiple instances of an application to coordinate and elect a single leader that performs specific tasks. This ensures high availability and prevents duplicate work across replicas.

Core Implementation

1. Leader Election Configuration

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoordinationV1Api;
import io.kubernetes.client.openapi.models.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class LeaderElectionConfig {
private final String namespace;
private final String leaseName;
private final String candidateId;
private final long leaseDurationSeconds;
private final long renewDeadlineSeconds;
private final long retryPeriodSeconds;
public LeaderElectionConfig(String namespace, String leaseName, String candidateId,
long leaseDurationSeconds, long renewDeadlineSeconds, 
long retryPeriodSeconds) {
this.namespace = namespace;
this.leaseName = leaseName;
this.candidateId = candidateId;
this.leaseDurationSeconds = leaseDurationSeconds;
this.renewDeadlineSeconds = renewDeadlineSeconds;
this.retryPeriodSeconds = retryPeriodSeconds;
}
// Getters
public String getNamespace() { return namespace; }
public String getLeaseName() { return leaseName; }
public String getCandidateId() { return candidateId; }
public long getLeaseDurationSeconds() { return leaseDurationSeconds; }
public long getRenewDeadlineSeconds() { return renewDeadlineSeconds; }
public long getRetryPeriodSeconds() { return retryPeriodSeconds; }
public static class Builder {
private String namespace = "default";
private String leaseName;
private String candidateId;
private long leaseDurationSeconds = 15;
private long renewDeadlineSeconds = 10;
private long retryPeriodSeconds = 2;
public Builder withNamespace(String namespace) {
this.namespace = namespace;
return this;
}
public Builder withLeaseName(String leaseName) {
this.leaseName = leaseName;
return this;
}
public Builder withCandidateId(String candidateId) {
this.candidateId = candidateId;
return this;
}
public Builder withLeaseDurationSeconds(long leaseDurationSeconds) {
this.leaseDurationSeconds = leaseDurationSeconds;
return this;
}
public Builder withRenewDeadlineSeconds(long renewDeadlineSeconds) {
this.renewDeadlineSeconds = renewDeadlineSeconds;
return this;
}
public Builder withRetryPeriodSeconds(long retryPeriodSeconds) {
this.retryPeriodSeconds = retryPeriodSeconds;
return this;
}
public LeaderElectionConfig build() {
if (leaseName == null) {
throw new IllegalStateException("Lease name is required");
}
if (candidateId == null) {
throw new IllegalStateException("Candidate ID is required");
}
return new LeaderElectionConfig(namespace, leaseName, candidateId,
leaseDurationSeconds, renewDeadlineSeconds, retryPeriodSeconds);
}
}
}

2. Lease Management Service

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoordinationV1Api;
import io.kubernetes.client.openapi.models.*;
import org.springframework.stereotype.Service;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class LeaseService {
private static final Logger logger = LoggerFactory.getLogger(LeaseService.class);
private final CoordinationV1Api coordinationV1Api;
public LeaseService(ApiClient apiClient) {
this.coordinationV1Api = new CoordinationV1Api(apiClient);
}
// Get or create lease
public V1Lease getOrCreateLease(String namespace, String leaseName, String candidateId) 
throws ApiException {
try {
return getLease(namespace, leaseName);
} catch (ApiException e) {
if (e.getCode() == 404) {
logger.info("Lease {} not found, creating new one", leaseName);
return createLease(namespace, leaseName, candidateId);
}
throw e;
}
}
// Get existing lease
public V1Lease getLease(String namespace, String leaseName) throws ApiException {
return coordinationV1Api.readNamespacedLease(leaseName, namespace, null);
}
// Create new lease
public V1Lease createLease(String namespace, String leaseName, String candidateId) 
throws ApiException {
V1Lease lease = new V1Lease()
.metadata(new V1ObjectMeta()
.name(leaseName)
.namespace(namespace))
.spec(new V1LeaseSpec()
.holderIdentity(candidateId)
.acquireTime(OffsetDateTime.now())
.renewTime(OffsetDateTime.now())
.leaseDurationSeconds(15L));
return coordinationV1Api.createNamespacedLease(namespace, lease, null, null, null, null);
}
// Update lease
public V1Lease updateLease(V1Lease lease, String candidateId) throws ApiException {
V1LeaseSpec spec = lease.getSpec();
if (spec == null) {
spec = new V1LeaseSpec();
}
spec.setHolderIdentity(candidateId)
.setRenewTime(OffsetDateTime.now())
.setLeaseDurationSeconds(15L);
lease.setSpec(spec);
return coordinationV1Api.replaceNamespacedLease(
lease.getMetadata().getName(),
lease.getMetadata().getNamespace(),
lease,
null, null, null, null);
}
// Try to acquire lease
public boolean tryAcquireLease(V1Lease lease, String candidateId, long leaseDuration) {
V1LeaseSpec spec = lease.getSpec();
// If no holder or lease expired, we can acquire it
if (spec == null || spec.getHolderIdentity() == null) {
return true;
}
String currentHolder = spec.getHolderIdentity();
OffsetDateTime renewTime = spec.getRenewTime();
Long currentLeaseDuration = spec.getLeaseDurationSeconds();
// If we're already the holder, we can keep it
if (candidateId.equals(currentHolder)) {
return true;
}
// If lease has expired, we can acquire it
if (renewTime != null && currentLeaseDuration != null) {
OffsetDateTime expiryTime = renewTime.plusSeconds(currentLeaseDuration);
if (OffsetDateTime.now().isAfter(expiryTime)) {
logger.info("Lease has expired, current holder: {}", currentHolder);
return true;
}
}
return false;
}
// Check if we are the current leader
public boolean isLeader(V1Lease lease, String candidateId) {
V1LeaseSpec spec = lease.getSpec();
if (spec == null || spec.getHolderIdentity() == null) {
return false;
}
boolean isLeader = candidateId.equals(spec.getHolderIdentity());
// Also check if lease is still valid
if (isLeader && spec.getRenewTime() != null && spec.getLeaseDurationSeconds() != null) {
OffsetDateTime expiryTime = spec.getRenewTime().plusSeconds(spec.getLeaseDurationSeconds());
if (OffsetDateTime.now().isAfter(expiryTime)) {
logger.warn("We are leader but lease has expired");
return false;
}
}
return isLeader;
}
// Get current leader
public String getCurrentLeader(V1Lease lease) {
V1LeaseSpec spec = lease.getSpec();
if (spec == null) {
return null;
}
return spec.getHolderIdentity();
}
}

3. Leader Election Manager

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.OffsetDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(LeaderElectionManager.class);
private final LeaseService leaseService;
private final LeaderElectionConfig config;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final AtomicReference<V1Lease> currentLease = new AtomicReference<>();
private final AtomicReference<OffsetDateTime> lastLeaderTransition = new AtomicReference<>();
private LeaderCallbacks callbacks;
public LeaderElectionManager(LeaseService leaseService, LeaderElectionConfig config) {
this.leaseService = leaseService;
this.config = config;
this.scheduler = Executors.newScheduledThreadPool(2);
}
public void start() {
if (isRunning.compareAndSet(false, true)) {
logger.info("Starting leader election for candidate: {}", config.getCandidateId());
scheduler.scheduleWithFixedDelay(this::runLeaderElection, 0, 
config.getRetryPeriodSeconds(), TimeUnit.SECONDS);
}
}
public void stop() {
if (isRunning.compareAndSet(true, false)) {
logger.info("Stopping leader election for candidate: {}", config.getCandidateId());
scheduler.shutdown();
// If we were leader, call onStopLeading
if (isLeader.get() && callbacks != null) {
try {
callbacks.onStopLeading();
} catch (Exception e) {
logger.error("Error in onStopLeading callback", e);
}
}
isLeader.set(false);
}
}
private void runLeaderElection() {
if (!isRunning.get()) {
return;
}
try {
// Get or create the lease
V1Lease lease = leaseService.getOrCreateLease(
config.getNamespace(), config.getLeaseName(), config.getCandidateId());
currentLease.set(lease);
boolean wasLeader = isLeader.get();
boolean nowLeader = leaseService.isLeader(lease, config.getCandidateId());
// Try to acquire lease if we're not the leader
if (!nowLeader) {
boolean canAcquire = leaseService.tryAcquireLease(lease, config.getCandidateId(), 
config.getLeaseDurationSeconds());
if (canAcquire) {
// Try to become leader
V1Lease updatedLease = leaseService.updateLease(lease, config.getCandidateId());
currentLease.set(updatedLease);
nowLeader = true;
logger.info("Successfully acquired leadership");
}
} else {
// Renew our lease
V1Lease updatedLease = leaseService.updateLease(lease, config.getCandidateId());
currentLease.set(updatedLease);
}
// Handle leadership transitions
if (wasLeader != nowLeader) {
isLeader.set(nowLeader);
lastLeaderTransition.set(OffsetDateTime.now());
if (nowLeader) {
logger.info("Became leader: {}", config.getCandidateId());
if (callbacks != null) {
try {
callbacks.onStartLeading();
} catch (Exception e) {
logger.error("Error in onStartLeading callback", e);
}
}
} else {
logger.info("Lost leadership: {}", config.getCandidateId());
if (callbacks != null) {
try {
callbacks.onStopLeading();
} catch (Exception e) {
logger.error("Error in onStopLeading callback", e);
}
}
}
}
} catch (ApiException e) {
logger.error("Error during leader election", e);
handleLeaderElectionError(e);
} catch (Exception e) {
logger.error("Unexpected error during leader election", e);
}
}
private void handleLeaderElectionError(ApiException e) {
if (e.getCode() == 409) { // Conflict - lease was updated by someone else
logger.debug("Lease update conflict, will retry");
} else if (e.getCode() == 404) { // Not found - lease was deleted
logger.info("Lease was deleted, will recreate on next attempt");
currentLease.set(null);
} else {
// For other errors, we might want to back off
logger.warn("API error during leader election, code: {}", e.getCode());
}
}
// Public API
public boolean isLeader() {
return isLeader.get();
}
public String getCurrentLeader() {
V1Lease lease = currentLease.get();
if (lease != null) {
return leaseService.getCurrentLeader(lease);
}
return null;
}
public OffsetDateTime getLastLeaderTransition() {
return lastLeaderTransition.get();
}
public void setCallbacks(LeaderCallbacks callbacks) {
this.callbacks = callbacks;
}
public interface LeaderCallbacks {
void onStartLeading();
void onStopLeading();
}
}

4. High-Level Leader Election Service

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class LeaderElectionService {
private static final Logger logger = LoggerFactory.getLogger(LeaderElectionService.class);
private final LeaderElectionManager leaderElectionManager;
private final AtomicReference<LeaderTask> currentLeaderTask = new AtomicReference<>();
public LeaderElectionService(LeaderElectionManager leaderElectionManager) {
this.leaderElectionManager = leaderElectionManager;
setupCallbacks();
}
public void start() {
leaderElectionManager.start();
}
public void stop() {
leaderElectionManager.stop();
}
public boolean isLeader() {
return leaderElectionManager.isLeader();
}
public String getCurrentLeader() {
return leaderElectionManager.getCurrentLeader();
}
public void registerLeaderTask(LeaderTask task) {
currentLeaderTask.set(task);
// If we're already leader, start the task immediately
if (isLeader() && task != null) {
try {
task.start();
} catch (Exception e) {
logger.error("Failed to start leader task", e);
}
}
}
private void setupCallbacks() {
leaderElectionManager.setCallbacks(new LeaderElectionManager.LeaderCallbacks() {
@Override
public void onStartLeading() {
logger.info("Starting leader tasks");
LeaderTask task = currentLeaderTask.get();
if (task != null) {
try {
task.start();
} catch (Exception e) {
logger.error("Failed to start leader task", e);
}
}
}
@Override
public void onStopLeading() {
logger.info("Stopping leader tasks");
LeaderTask task = currentLeaderTask.get();
if (task != null) {
try {
task.stop();
} catch (Exception e) {
logger.error("Failed to stop leader task", e);
}
}
}
});
}
public interface LeaderTask {
void start();
void stop();
}
}

5. Spring Boot Configuration

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LeaderElectionConfig {
@Value("${kubernetes.namespace:default}")
private String namespace;
@Value("${leader.election.lease.name:my-app-leader}")
private String leaseName;
@Value("${leader.election.candidate.id:${HOSTNAME:unknown}}")
private String candidateId;
@Value("${leader.election.lease.duration:15}")
private long leaseDuration;
@Value("${leader.election.renew.deadline:10}")
private long renewDeadline;
@Value("${leader.election.retry.period:2}")
private long retryPeriod;
@Bean
public LeaderElectionConfig leaderElectionConfig() {
return new LeaderElectionConfig.Builder()
.withNamespace(namespace)
.withLeaseName(leaseName)
.withCandidateId(candidateId)
.withLeaseDurationSeconds(leaseDuration)
.withRenewDeadlineSeconds(renewDeadline)
.withRetryPeriodSeconds(retryPeriod)
.build();
}
@Bean
public LeaseService leaseService(io.kubernetes.client.openapi.ApiClient apiClient) {
return new LeaseService(apiClient);
}
@Bean
public LeaderElectionManager leaderElectionManager(
LeaseService leaseService, 
LeaderElectionConfig config) {
return new LeaderElectionManager(leaseService, config);
}
@Bean
public LeaderElectionService leaderElectionService(LeaderElectionManager manager) {
return new LeaderElectionService(manager);
}
}

6. Example Leader Tasks

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class ExampleLeaderTasks {
private static final Logger logger = LoggerFactory.getLogger(ExampleLeaderTasks.class);
private final LeaderElectionService leaderElectionService;
private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2);
private final AtomicBoolean taskRunning = new AtomicBoolean(false);
public ExampleLeaderTasks(LeaderElectionService leaderElectionService) {
this.leaderElectionService = leaderElectionService;
setupLeaderTask();
}
private void setupLeaderTask() {
leaderElectionService.registerLeaderTask(new LeaderElectionService.LeaderTask() {
@Override
public void start() {
logger.info("Starting scheduled leader tasks");
startScheduledTasks();
}
@Override
public void stop() {
logger.info("Stopping scheduled leader tasks");
stopScheduledTasks();
}
});
}
private void startScheduledTasks() {
if (taskRunning.compareAndSet(false, true)) {
// Task 1: Periodic health check
taskScheduler.scheduleAtFixedRate(this::healthCheckTask, 0, 30, TimeUnit.SECONDS);
// Task 2: Data cleanup
taskScheduler.scheduleAtFixedRate(this::cleanupTask, 0, 5, TimeUnit.MINUTES);
// Task 3: Cache warming
taskScheduler.schedule(this::cacheWarmupTask, 10, TimeUnit.SECONDS);
logger.info("All leader tasks started");
}
}
private void stopScheduledTasks() {
if (taskRunning.compareAndSet(true, false)) {
taskScheduler.shutdown();
try {
if (!taskScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
taskScheduler.shutdownNow();
}
} catch (InterruptedException e) {
taskScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
logger.info("All leader tasks stopped");
}
}
private void healthCheckTask() {
if (leaderElectionService.isLeader()) {
logger.info("Performing health check as leader");
// Implement health check logic here
}
}
private void cleanupTask() {
if (leaderElectionService.isLeader()) {
logger.info("Performing data cleanup as leader");
// Implement cleanup logic here
}
}
private void cacheWarmupTask() {
if (leaderElectionService.isLeader()) {
logger.info("Warming up cache as leader");
// Implement cache warming logic here
}
}
// Regular scheduled task that only runs on leader
@Scheduled(fixedRate = 60000) // Every minute
public void leaderOnlyTask() {
if (leaderElectionService.isLeader()) {
logger.info("Executing leader-only scheduled task");
// This task will only run on the leader instance
}
}
}

7. Distributed Lock Implementation

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class KubernetesDistributedLock implements Lock {
private static final Logger logger = LoggerFactory.getLogger(KubernetesDistributedLock.class);
private final LeaderElectionService leaderElectionService;
private final String lockName;
private final long acquireTimeout;
private final TimeUnit timeUnit;
public KubernetesDistributedLock(LeaderElectionService leaderElectionService, 
String lockName, long acquireTimeout, TimeUnit timeUnit) {
this.leaderElectionService = leaderElectionService;
this.lockName = lockName;
this.acquireTimeout = acquireTimeout;
this.timeUnit = timeUnit;
}
@Override
public void lock() {
long startTime = System.currentTimeMillis();
long timeoutMillis = timeUnit.toMillis(acquireTimeout);
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (leaderElectionService.isLeader()) {
return; // Lock acquired
}
try {
Thread.sleep(100); // Wait 100ms before retry
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while acquiring lock", e);
}
}
throw new RuntimeException("Timeout acquiring distributed lock: " + lockName);
}
@Override
public void lockInterruptibly() throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeoutMillis = timeUnit.toMillis(acquireTimeout);
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (leaderElectionService.isLeader()) {
return; // Lock acquired
}
Thread.sleep(100); // Wait 100ms before retry
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Interrupted while acquiring lock");
}
}
throw new RuntimeException("Timeout acquiring distributed lock: " + lockName);
}
@Override
public boolean tryLock() {
return leaderElectionService.isLeader();
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeoutMillis = unit.toMillis(time);
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (leaderElectionService.isLeader()) {
return true; // Lock acquired
}
Thread.sleep(100); // Wait 100ms before retry
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Interrupted while acquiring lock");
}
}
return false;
}
@Override
public void unlock() {
// With Kubernetes leader election, unlocking happens automatically
// when another instance becomes leader or this instance stops
logger.debug("Distributed lock released: {}", lockName);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("Conditions not supported for distributed locks");
}
}

8. Health Check and Monitoring

import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
@Component
public class LeaderElectionHealthIndicator implements HealthIndicator {
private final LeaderElectionService leaderElectionService;
private final LeaderElectionManager leaderElectionManager;
public LeaderElectionHealthIndicator(LeaderElectionService leaderElectionService,
LeaderElectionManager leaderElectionManager) {
this.leaderElectionService = leaderElectionService;
this.leaderElectionManager = leaderElectionManager;
}
@Override
public Health health() {
try {
boolean isLeader = leaderElectionService.isLeader();
String currentLeader = leaderElectionService.getCurrentLeader();
OffsetDateTime lastTransition = leaderElectionManager.getLastLeaderTransition();
Health.Builder healthBuilder;
if (isLeader) {
healthBuilder = Health.up()
.withDetail("role", "leader")
.withDetail("currentLeader", currentLeader);
} else {
healthBuilder = Health.up()
.withDetail("role", "follower")
.withDetail("currentLeader", currentLeader);
}
if (lastTransition != null) {
long secondsSinceTransition = ChronoUnit.SECONDS.between(
lastTransition, OffsetDateTime.now());
healthBuilder.withDetail("secondsSinceLastTransition", secondsSinceTransition);
}
return healthBuilder.build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}

9. REST Controller for Leadership Info

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api/leadership")
public class LeadershipController {
private final LeaderElectionService leaderElectionService;
private final LeaderElectionManager leaderElectionManager;
public LeadershipController(LeaderElectionService leaderElectionService,
LeaderElectionManager leaderElectionManager) {
this.leaderElectionService = leaderElectionService;
this.leaderElectionManager = leaderElectionManager;
}
@GetMapping("/status")
public Map<String, Object> getLeadershipStatus() {
Map<String, Object> status = new HashMap<>();
status.put("isLeader", leaderElectionService.isLeader());
status.put("currentLeader", leaderElectionService.getCurrentLeader());
status.put("lastLeaderTransition", leaderElectionManager.getLastLeaderTransition());
status.put("timestamp", OffsetDateTime.now());
return status;
}
@GetMapping("/health")
public Map<String, Object> getLeadershipHealth() {
Map<String, Object> health = new HashMap<>();
health.put("healthy", true);
health.put("leaderElectionActive", true);
health.put("currentRole", leaderElectionService.isLeader() ? "leader" : "follower");
return health;
}
}

10. Advanced Leader Election with Callbacks

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
@Component
public class AdvancedLeaderElection implements SmartLifecycle {
private static final Logger logger = LoggerFactory.getLogger(AdvancedLeaderElection.class);
private final LeaderElectionService leaderElectionService;
private final LeaderElectionManager leaderElectionManager;
private volatile boolean running = false;
public AdvancedLeaderElection(LeaderElectionService leaderElectionService,
LeaderElectionManager leaderElectionManager) {
this.leaderElectionService = leaderElectionService;
this.leaderElectionManager = leaderElectionManager;
setupAdvancedCallbacks();
}
private void setupAdvancedCallbacks() {
leaderElectionManager.setCallbacks(new LeaderElectionManager.LeaderCallbacks() {
@Override
public void onStartLeading() {
logger.info("🎉 Instance became the leader!");
performLeaderInitialization();
startLeaderWorkers();
notifyLeadershipAcquired();
}
@Override
public void onStopLeading() {
logger.info("👋 Instance lost leadership");
stopLeaderWorkers();
cleanupLeaderResources();
notifyLeadershipLost();
}
});
}
private void performLeaderInitialization() {
logger.info("Initializing leader-specific resources");
// Initialize leader-only resources
// - Database connections
// - Cache populations
// - External service registrations
}
private void startLeaderWorkers() {
logger.info("Starting leader-specific workers");
// Start background jobs that should only run on leader
// - Scheduled data processing
// - Cache warming
// - Health monitoring
}
private void stopLeaderWorkers() {
logger.info("Stopping leader-specific workers");
// Gracefully stop leader-only workers
}
private void cleanupLeaderResources() {
logger.info("Cleaning up leader-specific resources");
// Clean up resources that were leader-specific
}
private void notifyLeadershipAcquired() {
logger.info("Notifying system about leadership acquisition");
// Notify other components about leadership change
// - Send events to message queue
// - Update status in database
// - Trigger webhooks
}
private void notifyLeadershipLost() {
logger.info("Notifying system about leadership loss");
// Notify other components about leadership change
}
@Override
public void start() {
if (!running) {
logger.info("Starting advanced leader election");
leaderElectionService.start();
running = true;
}
}
@Override
public void stop() {
if (running) {
logger.info("Stopping advanced leader election");
leaderElectionService.stop();
running = false;
}
}
@Override
public boolean isRunning() {
return running;
}
// SmartLifecycle methods for phased startup/shutdown
@Override
public int getPhase() {
return Integer.MAX_VALUE - 100; // Start late, stop early
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
}

Usage Examples

11. Complete Usage Example

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class LeaderElectionDemo implements CommandLineRunner {
private final LeaderElectionService leaderElectionService;
private final AdvancedLeaderElection advancedLeaderElection;
public LeaderElectionDemo(LeaderElectionService leaderElectionService,
AdvancedLeaderElection advancedLeaderElection) {
this.leaderElectionService = leaderElectionService;
this.advancedLeaderElection = advancedLeaderElection;
}
@Override
public void run(String... args) throws Exception {
// Start leader election
advancedLeaderElection.start();
// Register a simple leader task
leaderElectionService.registerLeaderTask(new LeaderElectionService.LeaderTask() {
@Override
public void start() {
System.out.println("=== LEADER TASK STARTED ===");
System.out.println("This instance is now the leader!");
System.out.println("Performing leader-only initialization...");
}
@Override
public void stop() {
System.out.println("=== LEADER TASK STOPPED ===");
System.out.println("This instance is no longer the leader!");
System.out.println("Cleaning up leader resources...");
}
});
// Use distributed lock
KubernetesDistributedLock lock = new KubernetesDistributedLock(
leaderElectionService, "demo-lock", 30, TimeUnit.SECONDS);
// Try to acquire lock and perform critical section
if (lock.tryLock(10, TimeUnit.SECONDS)) {
try {
System.out.println("Acquired distributed lock, performing critical operation...");
// Perform critical operation
Thread.sleep(5000);
} finally {
lock.unlock();
}
}
}
}

12. Application Properties

# application.properties
# Kubernetes configuration
kubernetes.namespace=default
# Leader election configuration
leader.election.lease.name=my-application-leader
leader.election.candidate.id=${HOSTNAME:local-instance}
leader.election.lease.duration=15
leader.election.renew.deadline=10
leader.election.retry.period=2
# Logging
logging.level.com.example.leaderelection=DEBUG

This comprehensive leader election implementation provides:

  1. Lease Management: Acquire, renew, and release leadership leases
  2. Leader Election: Automatic leader election with configurable timeouts
  3. Callbacks: Custom logic for leadership transitions
  4. Distributed Locks: Kubernetes-based distributed locking
  5. Health Monitoring: Spring Boot health indicators
  6. REST API: Leadership status endpoints
  7. Spring Integration: Seamless Spring Boot integration
  8. Error Handling: Robust error handling and retry logic
  9. Monitoring: Comprehensive logging and monitoring

The implementation ensures that only one instance performs leader tasks at any time, providing high availability and preventing duplicate work across multiple replicas.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper