Etcd is a strongly consistent, distributed key-value store that provides reliable distributed coordination. It's widely used for service discovery, configuration management, and leader election in distributed systems.
Etcd Architecture Overview
Key Concepts:
- Raft Consensus: Etcd uses Raft protocol for consistency
- Key-Value Store: Hierarchical key-space with versioning
- Leases: Time-bound sessions for key expiration
- Watchers: Real-time key change notifications
- Transactions: Atomic multi-operation transactions
- Authentication: Role-based access control
Java Application → etcd-java client → Etcd Cluster
- Applications interact with etcd via gRPC protocol
- Etcd cluster maintains consistent state across nodes
- Support for watches, leases, and transactions
Dependencies and Setup
Maven Dependencies
<properties>
<etcd.version>0.7.6</etcd.version>
<jetcd.version>0.7.6</jetcd.version>
<spring-boot.version>3.1.0</spring-boot.version>
<grpc.version>1.56.1</grpc.version>
</properties>
<dependencies>
<!-- etcd Java Client -->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>${jetcd.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- gRPC -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- Caching -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.6</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
</dependency>
</dependencies>
Docker Compose for Development
# docker-compose.yml version: '3.8' services: etcd1: image: quay.io/coreos/etcd:v3.5.7 command: > etcd --name etcd1 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://etcd1:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://etcd1:2380 --initial-cluster etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 --initial-cluster-token my-etcd-cluster --initial-cluster-state new ports: - "2379:2379" - "2380:2380" networks: - etcd-network etcd2: image: quay.io/coreos/etcd:v3.5.7 command: > etcd --name etcd2 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://etcd2:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://etcd2:2380 --initial-cluster etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 --initial-cluster-token my-etcd-cluster --initial-cluster-state new networks: - etcd-network etcd3: image: quay.io/coreos/etcd:v3.5.7 command: > etcd --name etcd3 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://etcd3:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://etcd3:2380 --initial-cluster etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 --initial-cluster-token my-etcd-cluster --initial-cluster-state new networks: - etcd-network networks: etcd-network: driver: bridge
Core Etcd Client Configuration
1. Etcd Configuration Properties
@ConfigurationProperties(prefix = "etcd")
@Data
public class EtcdProperties {
private List<String> endpoints = List.of("http://localhost:2379");
private String username;
private String password;
private long connectionTimeoutMs = 5000;
private long keepAliveTimeMs = 30000;
private long retryDelayMs = 1000;
private int maxRetries = 3;
private boolean enableAuth = false;
private NamespaceConfig namespace = new NamespaceConfig();
@Data
public static class NamespaceConfig {
private String prefix = "/myapp";
private boolean enabled = true;
}
}
2. Etcd Client Configuration
@Configuration
@EnableConfigurationProperties(EtcdProperties.class)
@Slf4j
public class EtcdConfig {
@Bean
public Client etcdClient(EtcdProperties etcdProperties) {
try {
ClientBuilder clientBuilder = Client.builder()
.endpoints(etcdProperties.getEndpoints().toArray(new String[0]))
.connectTimeout(Duration.ofMillis(etcdProperties.getConnectionTimeoutMs()))
.keepaliveTime(Duration.ofMillis(etcdProperties.getKeepAliveTimeMs()));
// Configure authentication if enabled
if (etcdProperties.isEnableAuth() &&
etcdProperties.getUsername() != null &&
etcdProperties.getPassword() != null) {
clientBuilder.user(ByteSequence.from(etcdProperties.getUsername().getBytes()))
.password(ByteSequence.from(etcdProperties.getPassword().getBytes()));
}
Client client = clientBuilder.build();
log.info("Etcd client initialized with endpoints: {}", etcdProperties.getEndpoints());
return client;
} catch (Exception e) {
log.error("Failed to initialize etcd client", e);
throw new EtcdClientException("Failed to initialize etcd client", e);
}
}
@Bean
public KV etcdKV(Client client) {
return client.getKVClient();
}
@Bean
public Watch etcdWatch(Client client) {
return client.getWatchClient();
}
@Bean
public Lease etcdLease(Client client) {
return client.getLeaseClient();
}
@Bean
public Lock etcdLock(Client client) {
return client.getLockClient();
}
@Bean
@ConditionalOnMissingBean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}
3. Application Properties
# application.yaml etcd: endpoints: - "http://localhost:2379" - "http://etcd2:2379" - "http://etcd3:2379" connection-timeout-ms: 5000 keep-alive-time-ms: 30000 retry-delay-ms: 1000 max-retries: 3 enable-auth: false namespace: prefix: "/myapp" enabled: true app: etcd: cache: enabled: true ttl-minutes: 5 max-size: 1000 retry: max-attempts: 3 backoff-ms: 1000 watch: enabled: true reconnect-delay-ms: 5000 management: endpoints: web: exposure: include: health,info,metrics,etcd endpoint: health: show-details: always etcd: enabled: true logging: level: io.etcd: INFO com.example.etcd: DEBUG
Core Etcd Service Implementation
1. Base Etcd Service
@Service
@Slf4j
public class EtcdService {
protected final KV kvClient;
protected final Watch watchClient;
protected final Lease leaseClient;
protected final Lock lockClient;
protected final EtcdProperties etcdProperties;
protected final ObjectMapper objectMapper;
private final String namespacePrefix;
public EtcdService(KV kvClient, Watch watchClient, Lease leaseClient,
Lock lockClient, EtcdProperties etcdProperties,
ObjectMapper objectMapper) {
this.kvClient = kvClient;
this.watchClient = watchClient;
this.leaseClient = leaseClient;
this.lockClient = lockClient;
this.etcdProperties = etcdProperties;
this.objectMapper = objectMapper;
this.namespacePrefix = etcdProperties.getNamespace().isEnabled() ?
etcdProperties.getNamespace().getPrefix() : "";
}
/**
* Build full key with namespace prefix
*/
protected String buildKey(String key) {
if (namespacePrefix.isEmpty()) {
return key;
}
return namespacePrefix + key;
}
/**
* Convert string to ByteSequence
*/
protected ByteSequence toByteSequence(String value) {
return ByteSequence.from(value.getBytes(StandardCharsets.UTF_8));
}
/**
* Convert ByteSequence to string
*/
protected String toString(ByteSequence byteSequence) {
return byteSequence.toString(StandardCharsets.UTF_8);
}
/**
* Convert object to JSON string
*/
protected String toJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new EtcdSerializationException("Failed to serialize object to JSON", e);
}
}
/**
* Convert JSON string to object
*/
protected <T> T fromJson(String json, Class<T> type) {
try {
return objectMapper.readValue(json, type);
} catch (JsonProcessingException e) {
throw new EtcdSerializationException("Failed to deserialize JSON to object", e);
}
}
/**
* Execute with retry logic
*/
protected <T> T executeWithRetry(Supplier<T> operation, String operationName) {
int attempts = 0;
Exception lastException = null;
while (attempts < etcdProperties.getMaxRetries()) {
try {
return operation.get();
} catch (Exception e) {
lastException = e;
attempts++;
log.warn("Etcd operation {} failed (attempt {}/{}): {}",
operationName, attempts, etcdProperties.getMaxRetries(), e.getMessage());
if (attempts < etcdProperties.getMaxRetries()) {
try {
Thread.sleep(etcdProperties.getRetryDelayMs());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new EtcdOperationException("Operation interrupted", ie);
}
}
}
}
throw new EtcdOperationException("Failed to execute " + operationName + " after " +
etcdProperties.getMaxRetries() + " attempts", lastException);
}
}
2. Key-Value Service
@Service
@Slf4j
public class EtcdKVService extends EtcdService {
public EtcdKVService(KV kvClient, Watch watchClient, Lease leaseClient,
Lock lockClient, EtcdProperties etcdProperties,
ObjectMapper objectMapper) {
super(kvClient, watchClient, leaseClient, lockClient, etcdProperties, objectMapper);
}
/**
* Put a key-value pair
*/
public PutResponse put(String key, String value) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("Putting key: {} -> {}", fullKey, maskValue(value));
return kvClient.put(toByteSequence(fullKey), toByteSequence(value)).get();
}, "put");
}
/**
* Put a key-value pair with lease
*/
public PutResponse putWithLease(String key, String value, long leaseId) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("Putting key with lease: {} -> {} (lease: {})",
fullKey, maskValue(value), leaseId);
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
return kvClient.put(toByteSequence(fullKey), toByteSequence(value), putOption).get();
}, "putWithLease");
}
/**
* Put an object as JSON
*/
public <T> PutResponse putAsJson(String key, T value) {
String json = toJson(value);
return put(key, json);
}
/**
* Get value by key
*/
public String get(String key) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("Getting key: {}", fullKey);
GetResponse response = kvClient.get(toByteSequence(fullKey)).get();
if (response.getKvs().isEmpty()) {
return null;
}
String value = toString(response.getKvs().get(0).getValue());
log.debug("Retrieved key: {} -> {}", fullKey, maskValue(value));
return value;
}, "get");
}
/**
* Get value as specific type
*/
public <T> T getAs(String key, Class<T> type) {
String json = get(key);
if (json == null) {
return null;
}
return fromJson(json, type);
}
/**
* Get value with metadata
*/
public KeyValue getWithMetadata(String key) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
GetResponse response = kvClient.get(toByteSequence(fullKey)).get();
if (response.getKvs().isEmpty()) {
return null;
}
return response.getKvs().get(0);
}, "getWithMetadata");
}
/**
* Get multiple keys with prefix
*/
public List<KeyValue> getWithPrefix(String prefix) {
return executeWithRetry(() -> {
String fullPrefix = buildKey(prefix);
log.debug("Getting keys with prefix: {}", fullPrefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(toByteSequence(fullPrefix))
.build();
GetResponse response = kvClient.get(toByteSequence(fullPrefix), getOption).get();
log.debug("Found {} keys with prefix: {}", response.getKvs().size(), fullPrefix);
return response.getKvs();
}, "getWithPrefix");
}
/**
* Get multiple key-value pairs with prefix
*/
public Map<String, String> getKeyValuesWithPrefix(String prefix) {
List<KeyValue> kvs = getWithPrefix(prefix);
return kvs.stream()
.collect(Collectors.toMap(
kv -> toString(kv.getKey()).substring(namespacePrefix.length()),
kv -> toString(kv.getValue())
));
}
/**
* Delete a key
*/
public DeleteResponse delete(String key) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("Deleting key: {}", fullKey);
return kvClient.delete(toByteSequence(fullKey)).get();
}, "delete");
}
/**
* Delete keys with prefix
*/
public DeleteResponse deleteWithPrefix(String prefix) {
return executeWithRetry(() -> {
String fullPrefix = buildKey(prefix);
log.debug("Deleting keys with prefix: {}", fullPrefix);
DeleteOption deleteOption = DeleteOption.newBuilder()
.withPrefix(toByteSequence(fullPrefix))
.build();
DeleteResponse response = kvClient.delete(toByteSequence(fullPrefix), deleteOption).get();
log.debug("Deleted {} keys with prefix: {}", response.getDeleted(), fullPrefix);
return response;
}, "deleteWithPrefix");
}
/**
* Check if key exists
*/
public boolean exists(String key) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
GetResponse response = kvClient.get(toByteSequence(fullKey)).get();
return !response.getKvs().isEmpty();
}, "exists");
}
/**
* Atomic compare-and-swap (CAS)
*/
public boolean compareAndSwap(String key, String expectedValue, String newValue) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("CAS operation on key: {} (expected: {}, new: {})",
fullKey, maskValue(expectedValue), maskValue(newValue));
Txn txn = kvClient.txn();
// Compare current value with expected value
Cmp cmp = new Cmp(
toByteSequence(fullKey),
Cmp.Op.EQUAL,
CmpTarget.value(toByteSequence(expectedValue))
);
// If comparison succeeds, put new value
TxnResponse response = txn
.If(cmp)
.Then(Op.put(toByteSequence(fullKey), toByteSequence(newValue), PutOption.DEFAULT))
.Else(Op.get(toByteSequence(fullKey), GetOption.DEFAULT))
.commit()
.get();
boolean success = response.isSucceeded();
log.debug("CAS operation {} for key: {}", success ? "succeeded" : "failed", fullKey);
return success;
}, "compareAndSwap");
}
/**
* Atomic compare-and-delete (CAD)
*/
public boolean compareAndDelete(String key, String expectedValue) {
return executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("CAD operation on key: {} (expected: {})", fullKey, maskValue(expectedValue));
Txn txn = kvClient.txn();
Cmp cmp = new Cmp(
toByteSequence(fullKey),
Cmp.Op.EQUAL,
CmpTarget.value(toByteSequence(expectedValue))
);
TxnResponse response = txn
.If(cmp)
.Then(Op.delete(toByteSequence(fullKey), DeleteOption.DEFAULT))
.Else(Op.get(toByteSequence(fullKey), GetOption.DEFAULT))
.commit()
.get();
boolean success = response.isSucceeded();
log.debug("CAD operation {} for key: {}", success ? "succeeded" : "failed", fullKey);
return success;
}, "compareAndDelete");
}
/**
* Get all keys (use with caution in production)
*/
public List<String> getAllKeys() {
return executeWithRetry(() -> {
GetOption getOption = GetOption.newBuilder()
.withPrefix(toByteSequence(namespacePrefix))
.build();
GetResponse response = kvClient.get(toByteSequence(namespacePrefix), getOption).get();
return response.getKvs().stream()
.map(kv -> toString(kv.getKey()))
.collect(Collectors.toList());
}, "getAllKeys");
}
/**
* Get key count with prefix
*/
public long getKeyCount(String prefix) {
return executeWithRetry(() -> {
String fullPrefix = buildKey(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(toByteSequence(fullPrefix))
.withCountOnly(true)
.build();
GetResponse response = kvClient.get(toByteSequence(fullPrefix), getOption).get();
return response.getCount();
}, "getKeyCount");
}
private String maskValue(String value) {
if (value == null) return "null";
if (value.length() <= 8) return "***";
return value.substring(0, 4) + "***" + value.substring(value.length() - 4);
}
}
3. Lease Service
@Service
@Slf4j
public class EtcdLeaseService extends EtcdService {
public EtcdLeaseService(KV kvClient, Watch watchClient, Lease leaseClient,
Lock lockClient, EtcdProperties etcdProperties,
ObjectMapper objectMapper) {
super(kvClient, watchClient, leaseClient, lockClient, etcdProperties, objectMapper);
}
/**
* Create a new lease
*/
public long createLease(long ttlSeconds) {
return executeWithRetry(() -> {
log.debug("Creating lease with TTL: {} seconds", ttlSeconds);
LeaseGrantResponse response = leaseClient.grant(ttlSeconds).get();
long leaseId = response.getID();
log.debug("Created lease with ID: {} and TTL: {} seconds", leaseId, ttlSeconds);
return leaseId;
}, "createLease");
}
/**
* Keep lease alive
*/
public void keepAlive(long leaseId) {
executeWithRetry(() -> {
log.debug("Keeping lease alive: {}", leaseId);
leaseClient.keepAliveOnce(leaseId).get();
return null;
}, "keepAlive");
}
/**
* Revoke a lease
*/
public void revokeLease(long leaseId) {
executeWithRetry(() -> {
log.debug("Revoking lease: {}", leaseId);
leaseClient.revoke(leaseId).get();
log.debug("Successfully revoked lease: {}", leaseId);
return null;
}, "revokeLease");
}
/**
* Get lease time-to-live
*/
public LeaseTimeToLiveResponse getLeaseTTL(long leaseId) {
return executeWithRetry(() -> {
LeaseTimeToLiveResponse response = leaseClient.timeToLive(leaseId,
LeaseOption.newBuilder().withAttachedKeys().build()).get();
log.debug("Lease {} TTL: {} seconds, granted TTL: {} seconds",
leaseId, response.getTTl(), response.getGrantedTTL());
return response;
}, "getLeaseTTL");
}
/**
* Create a key with TTL (auto-expire)
*/
public PutResponse putWithTTL(String key, String value, long ttlSeconds) {
long leaseId = createLease(ttlSeconds);
try {
return putWithLease(key, value, leaseId);
} catch (Exception e) {
// Clean up lease if put fails
revokeLease(leaseId);
throw e;
}
}
/**
* Create a session with keep-alive
*/
public EtcdSession createSession(long ttlSeconds) {
long leaseId = createLease(ttlSeconds);
return new EtcdSession(leaseId, this, ttlSeconds);
}
}
@Data
class EtcdSession implements AutoCloseable {
private final long leaseId;
private final EtcdLeaseService leaseService;
private final long ttlSeconds;
private final Instant createdAt;
private volatile boolean closed = false;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public EtcdSession(long leaseId, EtcdLeaseService leaseService, long ttlSeconds) {
this.leaseId = leaseId;
this.leaseService = leaseService;
this.ttlSeconds = ttlSeconds;
this.createdAt = Instant.now();
// Start keep-alive background task
startKeepAlive();
}
private void startKeepAlive() {
long keepAliveInterval = Math.max(ttlSeconds / 2, 1);
scheduler.scheduleAtFixedRate(this::keepAlive,
keepAliveInterval, keepAliveInterval, TimeUnit.SECONDS);
}
private void keepAlive() {
if (!closed) {
try {
leaseService.keepAlive(leaseId);
} catch (Exception e) {
log.error("Failed to keep lease alive: {}", leaseId, e);
}
}
}
@Override
public void close() {
if (!closed) {
closed = true;
scheduler.shutdown();
try {
leaseService.revokeLease(leaseId);
} catch (Exception e) {
log.error("Failed to revoke lease: {}", leaseId, e);
}
}
}
public boolean isActive() {
return !closed;
}
}
4. Watch Service
@Service
@Slf4j
public class EtcdWatchService extends EtcdService {
private final Map<String, Watch.Watcher> activeWatchers = new ConcurrentHashMap<>();
public EtcdWatchService(KV kvClient, Watch watchClient, Lease leaseClient,
Lock lockClient, EtcdProperties etcdProperties,
ObjectMapper objectMapper) {
super(kvClient, watchClient, leaseClient, lockClient, etcdProperties, objectMapper);
}
/**
* Watch a specific key for changes
*/
public void watchKey(String key, Consumer<WatchEvent> onChange) {
executeWithRetry(() -> {
String fullKey = buildKey(key);
log.debug("Starting watch on key: {}", fullKey);
Watch.Watcher watcher = watchClient.watch(
toByteSequence(fullKey),
watchResponse -> {
for (WatchEvent event : watchResponse.getEvents()) {
log.debug("Watch event on key {}: {}", fullKey, event.getEventType());
onChange.accept(event);
}
}
);
activeWatchers.put(fullKey, watcher);
return null;
}, "watchKey");
}
/**
* Watch keys with prefix for changes
*/
public void watchPrefix(String prefix, Consumer<WatchEvent> onChange) {
executeWithRetry(() -> {
String fullPrefix = buildKey(prefix);
log.debug("Starting watch on prefix: {}", fullPrefix);
WatchOption watchOption = WatchOption.newBuilder()
.withPrefix(toByteSequence(fullPrefix))
.build();
Watch.Watcher watcher = watchClient.watch(
toByteSequence(fullPrefix),
watchOption,
watchResponse -> {
for (WatchEvent event : watchResponse.getEvents()) {
String key = toString(event.getKeyValue().getKey());
log.debug("Watch event on key {}: {}", key, event.getEventType());
onChange.accept(event);
}
}
);
activeWatchers.put(fullPrefix, watcher);
return null;
}, "watchPrefix");
}
/**
* Stop watching a specific key
*/
public void stopWatch(String key) {
String fullKey = buildKey(key);
Watch.Watcher watcher = activeWatchers.remove(fullKey);
if (watcher != null) {
watcher.close();
log.debug("Stopped watch on key: {}", fullKey);
}
}
/**
* Stop watching a prefix
*/
public void stopWatchPrefix(String prefix) {
String fullPrefix = buildKey(prefix);
Watch.Watcher watcher = activeWatchers.remove(fullPrefix);
if (watcher != null) {
watcher.close();
log.debug("Stopped watch on prefix: {}", fullPrefix);
}
}
/**
* Stop all active watchers
*/
public void stopAllWatchers() {
log.debug("Stopping all {} active watchers", activeWatchers.size());
for (Map.Entry<String, Watch.Watcher> entry : activeWatchers.entrySet()) {
entry.getValue().close();
}
activeWatchers.clear();
}
/**
* Get active watcher count
*/
public int getActiveWatcherCount() {
return activeWatchers.size();
}
@PreDestroy
public void cleanup() {
stopAllWatchers();
}
}
5. Lock Service
@Service
@Slf4j
public class EtcdLockService extends EtcdService {
public EtcdLockService(KV kvClient, Watch watchClient, Lease leaseClient,
Lock lockClient, EtcdProperties etcdProperties,
ObjectMapper objectMapper) {
super(kvClient, watchClient, leaseClient, lockClient, etcdProperties, objectMapper);
}
/**
* Acquire a distributed lock
*/
public EtcdLock acquireLock(String lockName, long timeoutSeconds) {
return executeWithRetry(() -> {
String fullLockName = buildKey("/locks/" + lockName);
log.debug("Acquiring lock: {}", fullLockName);
long leaseId = leaseClient.grant(timeoutSeconds).get().getID();
try {
LockResponse lockResponse = lockClient.lock(
toByteSequence(fullLockName),
leaseId
).get(timeoutSeconds, TimeUnit.SECONDS);
String lockKey = toString(lockResponse.getKey());
log.debug("Successfully acquired lock: {}", lockKey);
return new EtcdLock(lockKey, leaseId, this);
} catch (Exception e) {
// Clean up lease if lock acquisition fails
leaseClient.revoke(leaseId).get();
throw new EtcdLockException("Failed to acquire lock: " + lockName, e);
}
}, "acquireLock");
}
/**
* Try to acquire lock with timeout
*/
public Optional<EtcdLock> tryLock(String lockName, long waitTimeSeconds, long leaseTimeSeconds) {
try {
EtcdLock lock = acquireLock(lockName, leaseTimeSeconds);
return Optional.of(lock);
} catch (EtcdLockException e) {
log.debug("Failed to acquire lock immediately: {}", lockName);
return Optional.empty();
}
}
/**
* Release a lock
*/
public void releaseLock(String lockKey, long leaseId) {
executeWithRetry(() -> {
log.debug("Releasing lock: {}", lockKey);
// Unlock first
lockClient.unlock(toByteSequence(lockKey)).get();
// Then revoke the lease
leaseClient.revoke(leaseId).get();
log.debug("Successfully released lock: {}", lockKey);
return null;
}, "releaseLock");
}
}
@Data
class EtcdLock implements AutoCloseable {
private final String lockKey;
private final long leaseId;
private final EtcdLockService lockService;
private volatile boolean released = false;
public EtcdLock(String lockKey, long leaseId, EtcdLockService lockService) {
this.lockKey = lockKey;
this.leaseId = leaseId;
this.lockService = lockService;
}
/**
* Release the lock
*/
public void release() {
if (!released) {
lockService.releaseLock(lockKey, leaseId);
released = true;
}
}
/**
* Check if lock is held
*/
public boolean isHeld() {
return !released;
}
@Override
public void close() {
release();
}
/**
* Execute a task while holding the lock
*/
public <T> T executeWithLock(Supplier<T> task) {
try {
return task.get();
} finally {
release();
}
}
/**
* Execute a task while holding the lock (void return)
*/
public void executeWithLock(Runnable task) {
try {
task.run();
} finally {
release();
}
}
}
Advanced Features
1. Cached Etcd Service
@Service
@Slf4j
@EnableCaching
public class CachedEtcdService {
private final EtcdKVService etcdKVService;
private final EtcdProperties etcdProperties;
private final Cache<String, String> keyCache;
public CachedEtcdService(EtcdKVService etcdKVService, EtcdProperties etcdProperties) {
this.etcdKVService = etcdKVService;
this.etcdProperties = etcdProperties;
this.keyCache = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES) // Default TTL
.maximumSize(1000)
.build();
// Setup cache invalidation on key changes
setupCacheInvalidation();
}
/**
* Get value with caching
*/
@Cacheable(value = "etcd", key = "#key")
public String getWithCache(String key) {
return etcdKVService.get(key);
}
/**
* Get value as type with caching
*/
public <T> T getWithCache(String key, Class<T> type) {
String value = getWithCache(key);
if (value == null) {
return null;
}
return etcdKVService.fromJson(value, type);
}
/**
* Put value and invalidate cache
*/
@CacheEvict(value = "etcd", key = "#key")
public PutResponse putWithCache(String key, String value) {
return etcdKVService.put(key, value);
}
/**
* Put object as JSON and invalidate cache
*/
@CacheEvict(value = "etcd", key = "#key")
public <T> PutResponse putWithCache(String key, T value) {
return etcdKVService.putAsJson(key, value);
}
/**
* Delete key and invalidate cache
*/
@CacheEvict(value = "etcd", key = "#key")
public DeleteResponse deleteWithCache(String key) {
return etcdKVService.delete(key);
}
/**
* Invalidate cache for multiple keys
*/
@CacheEvict(value = "etcd", allEntries = true)
public void invalidateAllCache() {
log.debug("Invalidated all etcd cache entries");
}
/**
* Setup cache invalidation based on watch events
*/
private void setupCacheInvalidation() {
// Watch for changes in our namespace and invalidate cache
etcdKVService.watchPrefix("", event -> {
String key = etcdKVService.toString(event.getKeyValue().getKey());
// Remove namespace prefix if present
if (key.startsWith(etcdProperties.getNamespace().getPrefix())) {
key = key.substring(etcdProperties.getNamespace().getPrefix().length());
}
// Invalidate cache for this key
keyCache.invalidate(key);
log.debug("Invalidated cache for key: {}", key);
});
}
}
2. Configuration Service
@Service
@Slf4j
public class EtcdConfigurationService {
private final EtcdKVService etcdKVService;
private final EtcdWatchService etcdWatchService;
private final Map<String, Consumer<String>> configListeners = new ConcurrentHashMap<>();
public EtcdConfigurationService(EtcdKVService etcdKVService, EtcdWatchService etcdWatchService) {
this.etcdKVService = etcdKVService;
this.etcdWatchService = etcdWatchService;
// Setup configuration watching
setupConfigurationWatching();
}
/**
* Get configuration value
*/
public String getConfig(String configKey) {
return etcdKVService.get("/config/" + configKey);
}
/**
* Get configuration value with default
*/
public String getConfig(String configKey, String defaultValue) {
String value = getConfig(configKey);
return value != null ? value : defaultValue;
}
/**
* Get configuration as specific type
*/
public <T> T getConfig(String configKey, Class<T> type) {
String value = getConfig(configKey);
return value != null ? etcdKVService.fromJson(value, type) : null;
}
/**
* Get configuration as specific type with default
*/
public <T> T getConfig(String configKey, Class<T> type, T defaultValue) {
T value = getConfig(configKey, type);
return value != null ? value : defaultValue;
}
/**
* Set configuration value
*/
public void setConfig(String configKey, String value) {
etcdKVService.put("/config/" + configKey, value);
}
/**
* Set configuration object as JSON
*/
public <T> void setConfig(String configKey, T value) {
etcdKVService.putAsJson("/config/" + configKey, value);
}
/**
* Register configuration change listener
*/
public void addConfigListener(String configKey, Consumer<String> listener) {
configListeners.put(configKey, listener);
log.debug("Registered config listener for: {}", configKey);
}
/**
* Remove configuration change listener
*/
public void removeConfigListener(String configKey) {
configListeners.remove(configKey);
log.debug("Removed config listener for: {}", configKey);
}
/**
* Get all configuration keys
*/
public List<String> getAllConfigKeys() {
return etcdKVService.getWithPrefix("/config/").stream()
.map(kv -> etcdKVService.toString(kv.getKey()))
.collect(Collectors.toList());
}
/**
* Get all configuration values
*/
public Map<String, String> getAllConfigs() {
return etcdKVService.getKeyValuesWithPrefix("/config/");
}
/**
* Setup configuration change watching
*/
private void setupConfigurationWatching() {
etcdWatchService.watchPrefix("/config/", event -> {
String key = etcdKVService.toString(event.getKeyValue().getKey());
String configKey = key.replace("/config/", "");
String value = etcdKVService.toString(event.getKeyValue().getValue());
Consumer<String> listener = configListeners.get(configKey);
if (listener != null) {
try {
listener.accept(value);
log.debug("Notified config listener for: {}", configKey);
} catch (Exception e) {
log.error("Error in config listener for: {}", configKey, e);
}
}
});
}
}
3. Service Discovery Service
@Service
@Slf4j
public class EtcdServiceDiscovery {
private final EtcdKVService etcdKVService;
private final EtcdLeaseService etcdLeaseService;
private final EtcdWatchService etcdWatchService;
private final Map<String, List<ServiceInstance>> serviceInstances = new ConcurrentHashMap<>();
private final Map<String, EtcdSession> serviceSessions = new ConcurrentHashMap<>();
public EtcdServiceDiscovery(EtcdKVService etcdKVService,
EtcdLeaseService etcdLeaseService,
EtcdWatchService etcdWatchService) {
this.etcdKVService = etcdKVService;
this.etcdLeaseService = etcdLeaseService;
this.etcdWatchService = etcdWatchService;
// Setup service discovery watching
setupServiceWatching();
}
/**
* Register a service instance
*/
public void registerService(String serviceName, String instanceId,
String host, int port, Map<String, String> metadata) {
ServiceInstance instance = ServiceInstance.builder()
.serviceName(serviceName)
.instanceId(instanceId)
.host(host)
.port(port)
.metadata(metadata != null ? metadata : Map.of())
.registrationTime(Instant.now())
.build();
String key = String.format("/services/%s/instances/%s", serviceName, instanceId);
String value = etcdKVService.toJson(instance);
// Register with TTL for auto-cleanup
long ttl = 30; // 30 seconds
etcdKVService.putWithTTL(key, value, ttl);
// Create session for keep-alive
EtcdSession session = etcdLeaseService.createSession(ttl);
serviceSessions.put(instanceId, session);
log.info("Registered service instance: {} - {}:{}", serviceName, host, port);
}
/**
* Deregister a service instance
*/
public void deregisterService(String serviceName, String instanceId) {
String key = String.format("/services/%s/instances/%s", serviceName, instanceId);
etcdKVService.delete(key);
EtcdSession session = serviceSessions.remove(instanceId);
if (session != null) {
session.close();
}
log.info("Deregistered service instance: {} - {}", serviceName, instanceId);
}
/**
* Get all instances of a service
*/
public List<ServiceInstance> getServiceInstances(String serviceName) {
return serviceInstances.getOrDefault(serviceName, List.of());
}
/**
* Get available services
*/
public Set<String> getAvailableServices() {
return serviceInstances.keySet();
}
/**
* Watch for service changes
*/
public void watchService(String serviceName, Consumer<List<ServiceInstance>> onChange) {
// Implementation for service-specific watching
}
/**
* Setup service discovery watching
*/
private void setupServiceWatching() {
etcdWatchService.watchPrefix("/services/", event -> {
String key = etcdKVService.toString(event.getKeyValue().getKey());
// Parse service name from key
String[] parts = key.split("/");
if (parts.length >= 4) {
String serviceName = parts[2];
updateServiceInstances(serviceName);
}
});
}
/**
* Update service instances cache
*/
private void updateServiceInstances(String serviceName) {
String prefix = String.format("/services/%s/instances/", serviceName);
List<KeyValue> instances = etcdKVService.getWithPrefix(prefix);
List<ServiceInstance> serviceInstancesList = instances.stream()
.map(kv -> {
try {
return etcdKVService.fromJson(etcdKVService.toString(kv.getValue()), ServiceInstance.class);
} catch (Exception e) {
log.error("Failed to parse service instance: {}", kv.getKey(), e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
serviceInstances.put(serviceName, serviceInstancesList);
log.debug("Updated service instances for {}: {} instances",
serviceName, serviceInstancesList.size());
}
@PreDestroy
public void cleanup() {
// Close all sessions
for (EtcdSession session : serviceSessions.values()) {
session.close();
}
serviceSessions.clear();
}
}
@Data
@Builder
class ServiceInstance {
private String serviceName;
private String instanceId;
private String host;
private int port;
private Map<String, String> metadata;
private Instant registrationTime;
public String getAddress() {
return host + ":" + port;
}
}
REST API Controllers
1. Key-Value Controller
@RestController
@RequestMapping("/api/etcd/kv")
@Slf4j
@Validated
public class EtcdKVController {
private final EtcdKVService etcdKVService;
private final CachedEtcdService cachedEtcdService;
public EtcdKVController(EtcdKVService etcdKVService, CachedEtcdService cachedEtcdService) {
this.etcdKVService = etcdKVService;
this.cachedEtcdService = cachedEtcdService;
}
@PutMapping("/{key}")
public ResponseEntity<ApiResponse<PutResponse>> put(
@PathVariable String key,
@RequestBody PutRequest request) {
try {
PutResponse response;
if (request.getTtlSeconds() != null) {
response = etcdKVService.putWithTTL(key, request.getValue(), request.getTtlSeconds());
} else {
response = etcdKVService.put(key, request.getValue());
}
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("Failed to put key: {}", key, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to put key: " + e.getMessage()));
}
}
@GetMapping("/{key}")
public ResponseEntity<ApiResponse<String>> get(
@PathVariable String key,
@RequestParam(defaultValue = "false") boolean cached) {
try {
String value = cached ?
cachedEtcdService.getWithCache(key) :
etcdKVService.get(key);
if (value == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND)
.body(ApiResponse.error("Key not found: " + key));
}
return ResponseEntity.ok(ApiResponse.success(value));
} catch (Exception e) {
log.error("Failed to get key: {}", key, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to get key: " + e.getMessage()));
}
}
@DeleteMapping("/{key}")
public ResponseEntity<ApiResponse<DeleteResponse>> delete(@PathVariable String key) {
try {
DeleteResponse response = etcdKVService.delete(key);
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("Failed to delete key: {}", key, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to delete key: " + e.getMessage()));
}
}
@GetMapping("/prefix/{prefix}")
public ResponseEntity<ApiResponse<Map<String, String>>> getWithPrefix(
@PathVariable String prefix) {
try {
Map<String, String> keyValues = etcdKVService.getKeyValuesWithPrefix(prefix);
return ResponseEntity.ok(ApiResponse.success(keyValues));
} catch (Exception e) {
log.error("Failed to get keys with prefix: {}", prefix, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to get keys: " + e.getMessage()));
}
}
@PostMapping("/cas/{key}")
public ResponseEntity<ApiResponse<Boolean>> compareAndSwap(
@PathVariable String key,
@RequestBody CompareAndSwapRequest request) {
try {
boolean success = etcdKVService.compareAndSwap(
key, request.getExpectedValue(), request.getNewValue());
return ResponseEntity.ok(ApiResponse.success(success));
} catch (Exception e) {
log.error("Failed to perform CAS on key: {}", key, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to perform CAS: " + e.getMessage()));
}
}
}
@Data
class PutRequest {
@NotBlank
private String value;
private Long ttlSeconds;
}
@Data
class CompareAndSwapRequest {
@NotBlank
private String expectedValue;
@NotBlank
private String newValue;
}
2. Service Discovery Controller
@RestController
@RequestMapping("/api/etcd/services")
@Slf4j
public class ServiceDiscoveryController {
private final EtcdServiceDiscovery serviceDiscovery;
public ServiceDiscoveryController(EtcdServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@PostMapping("/{serviceName}/instances/{instanceId}")
public ResponseEntity<ApiResponse<String>> registerInstance(
@PathVariable String serviceName,
@PathVariable String instanceId,
@RequestBody RegisterInstanceRequest request) {
try {
serviceDiscovery.registerService(
serviceName, instanceId,
request.getHost(), request.getPort(),
request.getMetadata());
return ResponseEntity.ok(ApiResponse.success("Service registered successfully"));
} catch (Exception e) {
log.error("Failed to register service instance", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to register service: " + e.getMessage()));
}
}
@DeleteMapping("/{serviceName}/instances/{instanceId}")
public ResponseEntity<ApiResponse<String>> deregisterInstance(
@PathVariable String serviceName,
@PathVariable String instanceId) {
try {
serviceDiscovery.deregisterService(serviceName, instanceId);
return ResponseEntity.ok(ApiResponse.success("Service deregistered successfully"));
} catch (Exception e) {
log.error("Failed to deregister service instance", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to deregister service: " + e.getMessage()));
}
}
@GetMapping("/{serviceName}/instances")
public ResponseEntity<ApiResponse<List<ServiceInstance>>> getInstances(
@PathVariable String serviceName) {
try {
List<ServiceInstance> instances = serviceDiscovery.getServiceInstances(serviceName);
return ResponseEntity.ok(ApiResponse.success(instances));
} catch (Exception e) {
log.error("Failed to get service instances", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to get instances: " + e.getMessage()));
}
}
@GetMapping
public ResponseEntity<ApiResponse<Set<String>>> getServices() {
try {
Set<String> services = serviceDiscovery.getAvailableServices();
return ResponseEntity.ok(ApiResponse.success(services));
} catch (Exception e) {
log.error("Failed to get services", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to get services: " + e.getMessage()));
}
}
}
@Data
class RegisterInstanceRequest {
@NotBlank
private String host;
private int port;
private Map<String, String> metadata;
}
3. Health Check Controller
@RestController
@RequestMapping("/api/etcd/health")
@Slf4j
public class EtcdHealthController {
private final EtcdKVService etcdKVService;
private final EtcdWatchService etcdWatchService;
private final Client etcdClient;
public EtcdHealthController(EtcdKVService etcdKVService,
EtcdWatchService etcdWatchService,
Client etcdClient) {
this.etcdKVService = etcdKVService;
this.etcdWatchService = etcdWatchService;
this.etcdClient = etcdClient;
}
@GetMapping
public ResponseEntity<EtcdHealth> getHealth() {
EtcdHealth health = new EtcdHealth();
try {
// Test basic connectivity
etcdKVService.put("/health-check", "test-value");
String value = etcdKVService.get("/health-check");
etcdKVService.delete("/health-check");
if ("test-value".equals(value)) {
health.setStatus(HealthStatus.UP);
health.setMessage("Etcd cluster is healthy");
} else {
health.setStatus(HealthStatus.DOWN);
health.setMessage("Etcd cluster returned unexpected value");
}
// Add additional health information
health.setActiveWatchers(etcdWatchService.getActiveWatcherCount());
health.setTimestamp(Instant.now());
} catch (Exception e) {
health.setStatus(HealthStatus.DOWN);
health.setMessage("Etcd cluster is not accessible: " + e.getMessage());
log.error("Etcd health check failed", e);
}
HttpStatus status = health.getStatus() == HealthStatus.UP ?
HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE;
return ResponseEntity.status(status).body(health);
}
@GetMapping("/cluster")
public ResponseEntity<ApiResponse<List<Member>>> getClusterInfo() {
try {
List<Member> members = etcdClient.getClusterClient().listMember().get().getMembers();
return ResponseEntity.ok(ApiResponse.success(members));
} catch (Exception e) {
log.error("Failed to get cluster info", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Failed to get cluster info: " + e.getMessage()));
}
}
}
@Data
class EtcdHealth {
private HealthStatus status;
private String message;
private int activeWatchers;
private Instant timestamp = Instant.now();
}
Custom Exceptions
public class EtcdClientException extends RuntimeException {
public EtcdClientException(String message) {
super(message);
}
public EtcdClientException(String message, Throwable cause) {
super(message, cause);
}
}
public class EtcdOperationException extends RuntimeException {
public EtcdOperationException(String message) {
super(message);
}
public EtcdOperationException(String message, Throwable cause) {
super(message, cause);
}
}
public class EtcdSerializationException extends RuntimeException {
public EtcdSerializationException(String message) {
super(message);
}
public EtcdSerializationException(String message, Throwable cause) {
super(message, cause);
}
}
public class EtcdLockException extends RuntimeException {
public EtcdLockException(String message) {
super(message);
}
public EtcdLockException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class EtcdExceptionHandler {
@ExceptionHandler(EtcdClientException.class)
public ResponseEntity<ApiResponse<?>> handleEtcdClientException(EtcdClientException e) {
log.error("Etcd client error", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Etcd client error: " + e.getMessage()));
}
@ExceptionHandler(EtcdOperationException.class)
public ResponseEntity<ApiResponse<?>> handleEtcdOperationException(EtcdOperationException e) {
log.error("Etcd operation failed", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("Etcd operation failed: " + e.getMessage()));
}
@ExceptionHandler(Exception.class)
public ResponseEntity<ApiResponse<?>> handleGenericException(Exception e) {
log.error("Unexpected error in etcd operations", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("An unexpected error occurred"));
}
}
Testing
1. Unit Tests
@ExtendWith(MockitoExtension.class)
class EtcdKVServiceTest {
@Mock
private KV kvClient;
@Mock
private Watch watchClient;
@Mock
private Lease leaseClient;
@Mock
private Lock lockClient;
@InjectMocks
private EtcdKVService etcdKVService;
@Test
void shouldPutKeyValue() throws Exception {
// Given
String key = "test-key";
String value = "test-value";
when(kvClient.put(any(ByteSequence.class), any(ByteSequence.class)))
.thenReturn(CompletableFuture.completedFuture(mock(PutResponse.class)));
// When
PutResponse response = etcdKVService.put(key, value);
// Then
assertThat(response).isNotNull();
verify(kvClient).put(any(ByteSequence.class), any(ByteSequence.class));
}
}
@SpringBootTest
@Testcontainers
class EtcdIntegrationTest {
@Container
private static final GenericContainer<?> etcdContainer =
new GenericContainer<>("quay.io/coreos/etcd:v3.5.7")
.withCommand("etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://localhost:2379")
.withExposedPorts(2379);
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("etcd.endpoints",
() -> List.of("http://localhost:" + etcdContainer.getMappedPort(2379)));
}
@Autowired
private EtcdKVService etcdKVService;
@Test
void shouldStoreAndRetrieveValue() {
// Given
String key = "integration-test";
String value = "integration-value";
// When
etcdKVService.put(key, value);
String retrievedValue = etcdKVService.get(key);
// Then
assertThat(retrievedValue).isEqualTo(value);
}
}
Best Practices
- Connection Management: Use connection pooling and proper shutdown
- Error Handling: Implement retry logic with exponential backoff
- Security: Enable authentication and TLS in production
- Monitoring: Monitor etcd cluster health and performance
- Backup: Regular etcd snapshot backups
- Resource Management: Properly close clients and watchers
// Example of proper resource cleanup
@Component
@Slf4j
public class EtcdResourceManager {
private final Client etcdClient;
private final EtcdWatchService etcdWatchService;
private final EtcdServiceDiscovery serviceDiscovery;
public EtcdResourceManager(Client etcdClient, EtcdWatchService etcdWatchService,
EtcdServiceDiscovery serviceDiscovery) {
this.etcdClient = etcdClient;
this.etcdWatchService = etcdWatchService;
this.serviceDiscovery = serviceDiscovery;
}
@PreDestroy
public void cleanup() {
log.info("Cleaning up etcd resources");
try {
etcdWatchService.stopAllWatchers();
serviceDiscovery.cleanup();
etcdClient.close();
log.info("Etcd resources cleaned up successfully");
} catch (Exception e) {
log.error("Error during etcd resource cleanup", e);
}
}
}
Conclusion
Etcd integration in Java provides:
- Distributed Coordination: Reliable distributed locking and leader election
- Configuration Management: Dynamic configuration with change notifications
- Service Discovery: Automatic service registration and discovery
- Strong Consistency: Raft-based consensus for reliable operations
- High Availability: Cluster-based fault tolerance
- Real-time Updates: Watch-based change notifications
By implementing the patterns shown above, you can build robust distributed systems that leverage etcd for coordination, configuration, and service discovery while maintaining high availability and strong consistency guarantees.
Java Logistics, Shipping Integration & Enterprise Inventory Automation (Tracking, ERP, RFID & Billing Systems)
https://macronepal.com/blog/aftership-tracking-in-java-enterprise-package-visibility/
Explains how to integrate AfterShip tracking services into Java applications to provide real-time shipment visibility, delivery status updates, and centralized tracking across multiple courier services.
https://macronepal.com/blog/shipping-integration-using-fedex-api-with-java-for-logistics-automation/
Explains how to integrate the FedEx API into Java systems to automate shipping tasks such as creating shipments, calculating delivery costs, generating shipping labels, and tracking packages.
https://macronepal.com/blog/shipping-and-logistics-integrating-ups-apis-with-java-applications/
Explains UPS API integration in Java to enable automated shipping operations including rate calculation, shipment scheduling, tracking, and delivery confirmation management.
https://macronepal.com/blog/generating-and-reading-qr-codes-for-products-in-java/
Explains how Java applications generate and read QR codes for product identification, tracking, and authentication, supporting faster inventory handling and product verification processes.
https://macronepal.com/blog/designing-a-robust-pick-and-pack-workflow-in-java/
Explains how to design an efficient pick-and-pack workflow in Java warehouse systems, covering order processing, item selection, packaging steps, and logistics preparation to improve fulfillment efficiency.
https://macronepal.com/blog/rfid-inventory-management-system-in-java-a-complete-guide/
Explains how RFID technology integrates with Java applications to automate inventory tracking, reduce manual errors, and enable real-time stock monitoring in warehouses and retail environments.
https://macronepal.com/blog/erp-integration-with-odoo-in-java/
Explains how Java applications connect with Odoo ERP systems to synchronize inventory, orders, customer records, and financial data across enterprise systems.
https://macronepal.com/blog/automated-invoice-generation-creating-professional-excel-invoices-with-apache-poi-in-java/
Explains how to automatically generate professional Excel invoices in Java using Apache POI, enabling structured billing documents and automated financial record creation.
https://macronepal.com/blog/enterprise-financial-integration-using-quickbooks-api-in-java-applications/
Explains QuickBooks API integration in Java to automate financial workflows such as invoice management, payment tracking, accounting synchronization, and financial reporting.