Couchbase SDK Integration in Java

Introduction to Couchbase

Couchbase is a distributed NoSQL document database that provides high performance, scalability, and flexibility. The Java SDK enables seamless integration with Couchbase Server, offering features like document storage, querying, indexing, and full-text search.

Setup and Dependencies

Maven Dependencies

<dependencies>
<!-- Couchbase Java SDK -->
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>3.4.6</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<!-- Reactor for reactive programming -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.4</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.6</version>
</dependency>
</dependencies>

Gradle Dependencies

dependencies {
implementation 'com.couchbase.client:java-client:3.4.6'
implementation 'com.fasterxml.jackson.core:jackson-core:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'io.projectreactor:reactor-core:3.5.4'
implementation 'org.slf4j:slf4j-api:2.0.6'
implementation 'org.slf4j:slf4j-simple:2.0.6'
}

Core Couchbase Concepts

Key Components

  • Cluster: Represents connection to Couchbase cluster
  • Bucket: Container for documents (similar to database)
  • Collection: Logical grouping of documents within a bucket
  • Scope: Namespace for collections
  • Document: Basic unit of data storage (JSON)

Basic SDK Integration

Connection Management

import com.couchbase.client.java.*;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.core.env.TimeoutConfig;
import com.couchbase.client.core.error.CouchbaseException;
import java.time.Duration;
public class CouchbaseConnectionManager {
private Cluster cluster;
private Bucket bucket;
private ClusterEnvironment environment;
public void connect() {
try {
// Configure cluster environment
this.environment = ClusterEnvironment.builder()
.timeoutConfig(TimeoutConfig
.kvTimeout(Duration.ofSeconds(10))
.queryTimeout(Duration.ofSeconds(120))
.connectTimeout(Duration.ofSeconds(30)))
.build();
// Connect to cluster
this.cluster = Cluster.connect(
"couchbase://localhost", 
ClusterOptions.clusterOptions("username", "password")
.environment(environment)
);
// Open bucket
this.bucket = cluster.bucket("travel-sample");
// Wait for bucket to be ready
bucket.waitUntilReady(Duration.ofSeconds(30));
System.out.println("Successfully connected to Couchbase cluster");
} catch (CouchbaseException e) {
System.err.println("Failed to connect to Couchbase: " + e.getMessage());
throw e;
}
}
public Collection getCollection(String scopeName, String collectionName) {
Scope scope = bucket.scope(scopeName);
return scope.collection(collectionName);
}
public Cluster getCluster() {
return cluster;
}
public Bucket getBucket() {
return bucket;
}
public void disconnect() {
if (cluster != null) {
cluster.disconnect();
}
if (environment != null) {
environment.shutdown();
}
}
// Singleton instance
private static CouchbaseConnectionManager instance;
public static synchronized CouchbaseConnectionManager getInstance() {
if (instance == null) {
instance = new CouchbaseConnectionManager();
instance.connect();
}
return instance;
}
// Configuration class
public static class CouchbaseConfig {
public static final String CONNECTION_STRING = "couchbase://localhost";
public static final String USERNAME = "Administrator";
public static final String PASSWORD = "password";
public static final String BUCKET_NAME = "travel-sample";
public static final String SCOPE_NAME = "_default";
public static final String COLLECTION_NAME = "_default";
}
}

Document Model Classes

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.List;
// User document model
@JsonInclude(JsonInclude.Include.NON_NULL)
public class User {
private String id;
private String username;
private String email;
private String firstName;
private String lastName;
private int age;
private List<String> roles;
private Address address;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Constructors
public User() {}
public User(String username, String email, String firstName, String lastName) {
this.username = username;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.createdAt = LocalDateTime.now();
}
// Getters and Setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public List<String> getRoles() { return roles; }
public void setRoles(List<String> roles) { this.roles = roles; }
public Address getAddress() { return address; }
public void setAddress(Address address) { this.address = address; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(LocalDateTime updatedAt) { this.updatedAt = updatedAt; }
@Override
public String toString() {
return String.format("User{id='%s', username='%s', email='%s', name='%s %s'}", 
id, username, email, firstName, lastName);
}
}
// Address embedded document
@JsonInclude(JsonInclude.Include.NON_NULL)
class Address {
private String street;
private String city;
private String state;
private String zipCode;
private String country;
// Constructors
public Address() {}
public Address(String street, String city, String state, String zipCode, String country) {
this.street = street;
this.city = city;
this.state = state;
this.zipCode = zipCode;
this.country = country;
}
// Getters and Setters
public String getStreet() { return street; }
public void setStreet(String street) { this.street = street; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
public String getState() { return state; }
public void setState(String state) { this.state = state; }
public String getZipCode() { return zipCode; }
public void setZipCode(String zipCode) { this.zipCode = zipCode; }
public String getCountry() { return country; }
public void setCountry(String country) { this.country = country; }
@Override
public String toString() {
return String.format("%s, %s, %s %s, %s", street, city, state, zipCode, country);
}
}
// Product document model
@JsonInclude(JsonInclude.Include.NON_NULL)
class Product {
private String id;
private String name;
private String description;
private double price;
private String category;
private int stockQuantity;
private List<String> tags;
private boolean active;
private LocalDateTime createdDate;
// Constructors, getters, setters
public Product() {}
public Product(String name, String description, double price, String category) {
this.name = name;
this.description = description;
this.price = price;
this.category = category;
this.active = true;
this.createdDate = LocalDateTime.now();
}
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public String getCategory() { return category; }
public void setCategory(String category) { this.category = category; }
public int getStockQuantity() { return stockQuantity; }
public void setStockQuantity(int stockQuantity) { this.stockQuantity = stockQuantity; }
public List<String> getTags() { return tags; }
public void setTags(List<String> tags) { this.tags = tags; }
public boolean isActive() { return active; }
public void setActive(boolean active) { this.active = active; }
public LocalDateTime getCreatedDate() { return createdDate; }
public void setCreatedDate(LocalDateTime createdDate) { this.createdDate = createdDate; }
}

CRUD Operations

Basic Document Operations

import com.couchbase.client.java.*;
import com.couchbase.client.java.kv.*;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.core.error.*;
import java.time.Duration;
import java.util.Optional;
public class CouchbaseCRUDOperations {
private final Collection collection;
public CouchbaseCRUDOperations(Collection collection) {
this.collection = collection;
}
// Create/Insert document
public MutationResult insertDocument(String documentId, Object document) {
try {
return collection.insert(documentId, document);
} catch (DocumentExistsException e) {
System.err.println("Document already exists with ID: " + documentId);
throw e;
} catch (CouchbaseException e) {
System.err.println("Failed to insert document: " + e.getMessage());
throw e;
}
}
// Upsert document (insert or update)
public MutationResult upsertDocument(String documentId, Object document) {
try {
return collection.upsert(documentId, document);
} catch (CouchbaseException e) {
System.err.println("Failed to upsert document: " + e.getMessage());
throw e;
}
}
// Get document by ID
public <T> Optional<T> getDocument(String documentId, Class<T> targetClass) {
try {
GetResult result = collection.get(documentId);
return Optional.of(result.contentAs(targetClass));
} catch (DocumentNotFoundException e) {
System.out.println("Document not found with ID: " + documentId);
return Optional.empty();
} catch (CouchbaseException e) {
System.err.println("Failed to get document: " + e.getMessage());
throw e;
}
}
// Get document with expiry
public <T> Optional<T> getDocumentWithExpiry(String documentId, Class<T> targetClass) {
try {
GetResult result = collection.get(documentId, GetOptions.getOptions());
T content = result.contentAs(targetClass);
System.out.println("Document expires in: " + result.expiry());
return Optional.of(content);
} catch (DocumentNotFoundException e) {
return Optional.empty();
}
}
// Replace document
public MutationResult replaceDocument(String documentId, Object document) {
try {
// Get current document to get CAS value
GetResult existing = collection.get(documentId);
return collection.replace(documentId, document, 
ReplaceOptions.replaceOptions().cas(existing.cas()));
} catch (DocumentNotFoundException e) {
System.err.println("Document not found for replacement: " + documentId);
throw e;
} catch (CasMismatchException e) {
System.err.println("CAS mismatch - document was modified by another operation");
throw e;
}
}
// Remove document
public void removeDocument(String documentId) {
try {
collection.remove(documentId);
System.out.println("Document removed: " + documentId);
} catch (DocumentNotFoundException e) {
System.err.println("Document not found for removal: " + documentId);
} catch (CouchbaseException e) {
System.err.println("Failed to remove document: " + e.getMessage());
throw e;
}
}
// Check document existence
public boolean documentExists(String documentId) {
try {
collection.exists(documentId);
return true;
} catch (CouchbaseException e) {
return false;
}
}
// Insert with expiry
public MutationResult insertWithExpiry(String documentId, Object document, Duration expiry) {
try {
return collection.insert(documentId, document, 
InsertOptions.insertOptions().expiry(expiry));
} catch (DocumentExistsException e) {
System.err.println("Document already exists: " + documentId);
throw e;
}
}
// Touch document (extend expiry)
public MutationResult touchDocument(String documentId, Duration newExpiry) {
try {
return collection.touch(documentId, newExpiry);
} catch (DocumentNotFoundException e) {
System.err.println("Document not found for touch: " + documentId);
throw e;
}
}
// Example usage
public static void main(String[] args) {
try {
CouchbaseConnectionManager connectionManager = CouchbaseConnectionManager.getInstance();
Collection collection = connectionManager.getCollection(
CouchbaseConnectionManager.CouchbaseConfig.SCOPE_NAME,
CouchbaseConnectionManager.CouchbaseConfig.COLLECTION_NAME
);
CouchbaseCRUDOperations crud = new CouchbaseCRUDOperations(collection);
// Create a user
User user = new User("john_doe", "[email protected]", "John", "Doe");
user.setAge(30);
user.setRoles(List.of("USER", "ADMIN"));
user.setAddress(new Address("123 Main St", "New York", "NY", "10001", "USA"));
// Insert user
String userId = "user::" + user.getUsername();
MutationResult insertResult = crud.insertDocument(userId, user);
System.out.println("User inserted with CAS: " + insertResult.cas());
// Retrieve user
Optional<User> retrievedUser = crud.getDocument(userId, User.class);
retrievedUser.ifPresent(u -> System.out.println("Retrieved: " + u));
// Update user
user.setAge(31);
user.setUpdatedAt(java.time.LocalDateTime.now());
MutationResult replaceResult = crud.replaceDocument(userId, user);
System.out.println("User updated with CAS: " + replaceResult.cas());
// Check existence
System.out.println("Document exists: " + crud.documentExists(userId));
// Clean up
crud.removeDocument(userId);
} finally {
CouchbaseConnectionManager.getInstance().disconnect();
}
}
}

Query Operations

N1QL Query Examples

import com.couchbase.client.java.*;
import com.couchbase.client.java.query.*;
import com.couchbase.client.java.json.JsonObject;
import java.util.List;
import java.util.Map;
public class CouchbaseQueryOperations {
private final Cluster cluster;
public CouchbaseQueryOperations(Cluster cluster) {
this.cluster = cluster;
}
// Simple parameterized query
public List<JsonObject> findUsersByAge(int minAge, int maxAge) {
String query = "SELECT u.* FROM `travel-sample`.inventory.user u " +
"WHERE u.age BETWEEN $minAge AND $maxAge";
QueryResult result = cluster.query(query, 
QueryOptions.queryOptions()
.parameters(JsonObject.create()
.put("minAge", minAge)
.put("maxAge", maxAge))
);
return result.rowsAs(JsonObject.class);
}
// Query with custom object mapping
public List<User> findUsersByCity(String city) {
String query = "SELECT u.* FROM `travel-sample`.inventory.user u " +
"WHERE u.address.city = $city";
QueryResult result = cluster.query(query,
QueryOptions.queryOptions()
.parameters(JsonObject.create().put("city", city))
);
return result.rowsAs(User.class);
}
// Complex query with joins
public List<JsonObject> findUserOrders(String username) {
String query = "SELECT u.username, u.email, o.orderId, o.totalAmount, o.orderDate " +
"FROM `travel-sample`.inventory.user u " +
"JOIN `travel-sample`.inventory.orders o ON u.id = o.userId " +
"WHERE u.username = $username " +
"ORDER BY o.orderDate DESC";
QueryResult result = cluster.query(query,
QueryOptions.queryOptions()
.parameters(JsonObject.create().put("username", username))
);
return result.rowsAs(JsonObject.class);
}
// Aggregation query
public Map<String, Object> getUserStats() {
String query = "SELECT COUNT(*) as totalUsers, " +
"AVG(age) as averageAge, " +
"MIN(age) as minAge, " +
"MAX(age) as maxAge " +
"FROM `travel-sample`.inventory.user " +
"WHERE age IS NOT NULL";
QueryResult result = cluster.query(query);
return result.rowsAs(JsonObject.class).get(0).toMap();
}
// Paginated query
public QueryResult findUsersPaginated(int limit, int offset) {
String query = "SELECT u.* FROM `travel-sample`.inventory.user u " +
"ORDER BY u.createdAt DESC " +
"LIMIT $limit OFFSET $offset";
return cluster.query(query,
QueryOptions.queryOptions()
.parameters(JsonObject.create()
.put("limit", limit)
.put("offset", offset))
);
}
// Query with indexing hint
public List<User> findUsersByRoleWithIndex(String role) {
String query = "SELECT u.* FROM `travel-sample`.inventory.user u " +
"USE INDEX (idx_user_roles) " +
"WHERE ANY role IN u.roles SATISFIES role = $role END";
QueryResult result = cluster.query(query,
QueryOptions.queryOptions()
.parameters(JsonObject.create().put("role", role))
);
return result.rowsAs(User.class);
}
// Explain query plan
public JsonObject explainQuery(String query, JsonObject parameters) {
QueryResult result = cluster.query("EXPLAIN " + query,
QueryOptions.queryOptions().parameters(parameters)
);
return result.rowsAs(JsonObject.class).get(0);
}
// Batch query execution
public void executeBatchQueries(List<String> queries) {
for (String query : queries) {
try {
QueryResult result = cluster.query(query);
System.out.println("Query executed successfully: " + query);
System.out.println("Rows affected: " + result.rowsAs(JsonObject.class).size());
} catch (Exception e) {
System.err.println("Failed to execute query: " + query + " - " + e.getMessage());
}
}
}
}

Advanced Features

Reactive Programming Support

import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.ReactiveCluster;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
public class CouchbaseReactiveOperations {
private final ReactiveCollection reactiveCollection;
private final ReactiveCluster reactiveCluster;
public CouchbaseReactiveOperations(Collection collection, Cluster cluster) {
this.reactiveCollection = collection.reactive();
this.reactiveCluster = cluster.reactive();
}
// Reactive insert
public Mono<String> insertUserReactive(User user) {
String documentId = "user::" + user.getUsername();
return reactiveCollection.insert(documentId, user)
.map(result -> {
System.out.println("Inserted document with CAS: " + result.cas());
return documentId;
})
.timeout(Duration.ofSeconds(5))
.onErrorResume(error -> {
System.err.println("Failed to insert user: " + error.getMessage());
return Mono.error(error);
});
}
// Reactive get with retry
public Mono<User> getUserReactive(String documentId) {
return reactiveCollection.get(documentId)
.map(getResult -> getResult.contentAs(User.class))
.retry(3) // Retry up to 3 times on failure
.timeout(Duration.ofSeconds(3))
.onErrorReturn(User.class, null); // Return null on error
}
// Bulk operations
public Flux<MutationResult> bulkInsertUsers(List<User> users) {
return Flux.fromIterable(users)
.flatMap(user -> {
String documentId = "user::" + user.getUsername();
return reactiveCollection.insert(documentId, user);
})
.buffer(10) // Process in batches of 10
.flatMap(Flux::fromIterable);
}
// Reactive query
public Flux<User> findActiveUsersReactive() {
String query = "SELECT u.* FROM `travel-sample`.inventory.user u " +
"WHERE u.active = true";
return reactiveCluster.query(query)
.flatMapMany(queryResult -> queryResult.rowsAs(User.class));
}
// Combine multiple reactive operations
public Mono<User> createUserWithValidation(User user) {
String documentId = "user::" + user.getUsername();
// Check if user exists first
return reactiveCollection.exists(documentId)
.flatMap(existsResult -> {
if (existsResult.exists()) {
return Mono.error(new RuntimeException("User already exists"));
}
return reactiveCollection.insert(documentId, user);
})
.then(getUserReactive(documentId)) // Then get the created user
.doOnSuccess(createdUser -> 
System.out.println("User created successfully: " + createdUser.getUsername()))
.doOnError(error -> 
System.err.println("Failed to create user: " + error.getMessage()));
}
// Backpressure-aware batch processing
public Flux<MutationResult> processUsersWithBackpressure(List<User> users) {
return Flux.fromIterable(users)
.delayElements(Duration.ofMillis(100)) // Control rate
.concatMap(user -> { // Process sequentially
String documentId = "user::" + user.getUsername();
return reactiveCollection.upsert(documentId, user);
}, 5); // Concurrency level of 5
}
}

Subdocument Operations

import com.couchbase.client.java.*;
import com.couchbase.client.java.kv.*;
import com.couchbase.client.java.json.JsonObject;
import java.util.Arrays;
import java.util.List;
public class SubdocumentOperations {
private final Collection collection;
public SubdocumentOperations(Collection collection) {
this.collection = collection;
}
// Get specific fields from document
public LookupInResult getPartialDocument(String documentId, List<String> fieldPaths) {
List<LookupInSpec> specs = fieldPaths.stream()
.map(LookupInSpec::get)
.toList();
return collection.lookupIn(documentId, specs);
}
// Update specific fields
public MutationResult updatePartialDocument(String documentId, Map<String, Object> fieldUpdates) {
List<MutateInSpec> specs = fieldUpdates.entrySet().stream()
.map(entry -> MutateInSpec.upsert(entry.getKey(), entry.getValue()))
.toList();
return collection.mutateIn(documentId, specs);
}
// Array operations
public MutationResult addToArray(String documentId, String arrayPath, Object newElement) {
return collection.mutateIn(documentId,
Arrays.asList(MutateInSpec.arrayAppend(arrayPath, Arrays.asList(newElement)))
);
}
// Counter operations
public MutationResult incrementCounter(String documentId, String counterPath, long delta) {
return collection.mutateIn(documentId,
Arrays.asList(MutateInSpec.increment(counterPath, delta))
);
}
// Complex subdocument operation
public MutationResult updateUserProfile(String userId, String newEmail, String newCity) {
return collection.mutateIn(userId, Arrays.asList(
MutateInSpec.upsert("email", newEmail),
MutateInSpec.upsert("address.city", newCity),
MutateInSpec.upsert("updatedAt", java.time.LocalDateTime.now().toString())
));
}
// Check document size before full retrieval
public long getDocumentSize(String documentId) {
LookupInResult result = collection.lookupIn(documentId,
Arrays.asList(LookupInSpec.count(""))
);
return result.contentAs(0, Long.class);
}
}

Index Management

import com.couchbase.client.java.manager.query.*;
import com.couchbase.client.java.query.QueryIndex;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class IndexManager {
private final QueryIndexManager indexManager;
public IndexManager(Cluster cluster) {
this.indexManager = cluster.queryIndexes();
}
// Create primary index
public void createPrimaryIndex(String bucketName) {
try {
indexManager.createPrimaryIndex(bucketName,
CreatePrimaryQueryIndexOptions.createPrimaryQueryIndexOptions()
.ignoreIfExists(true)
);
System.out.println("Primary index created on bucket: " + bucketName);
} catch (Exception e) {
System.err.println("Failed to create primary index: " + e.getMessage());
}
}
// Create secondary index
public void createSecondaryIndex(String bucketName, String indexName, List<String> fields) {
try {
indexManager.createIndex(bucketName, indexName, fields,
CreateQueryIndexOptions.createQueryIndexOptions()
.ignoreIfExists(true)
);
System.out.println("Secondary index created: " + indexName);
} catch (Exception e) {
System.err.println("Failed to create secondary index: " + e.getMessage());
}
}
// List all indexes
public List<QueryIndex> getAllIndexes(String bucketName) {
try {
return indexManager.getAllIndexes(bucketName);
} catch (Exception e) {
System.err.println("Failed to get indexes: " + e.getMessage());
return List.of();
}
}
// Drop index
public void dropIndex(String bucketName, String indexName) {
try {
indexManager.dropIndex(bucketName, indexName);
System.out.println("Index dropped: " + indexName);
} catch (Exception e) {
System.err.println("Failed to drop index: " + e.getMessage());
}
}
// Build deferred indexes
public void buildDeferredIndexes(String bucketName) {
try {
indexManager.buildDeferredIndexes(bucketName);
System.out.println("Deferred indexes built for bucket: " + bucketName);
} catch (Exception e) {
System.err.println("Failed to build deferred indexes: " + e.getMessage());
}
}
// Watch indexes (wait for building completion)
public void watchIndexes(String bucketName, List<String> indexNames, Duration timeout) {
try {
indexManager.watchIndexes(bucketName, indexNames, timeout);
System.out.println("Indexes are ready: " + indexNames);
} catch (TimeoutException e) {
System.err.println("Timeout waiting for indexes to build: " + e.getMessage());
} catch (Exception e) {
System.err.println("Failed to watch indexes: " + e.getMessage());
}
}
}

Error Handling and Retry Mechanisms

import com.couchbase.client.core.error.*;
import com.couchbase.client.java.kv.MutationResult;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class ErrorHandlingAndRetry {
// Retry with exponential backoff
public static <T> T retryWithBackoff(Supplier<T> operation, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (TemporaryFailureException | TimeoutException e) {
attempt++;
if (attempt == maxRetries) {
throw new RuntimeException("Operation failed after " + maxRetries + " attempts", e);
}
// Exponential backoff
long delayMs = (long) Math.pow(2, attempt) * 1000;
System.out.println("Retry attempt " + attempt + " after " + delayMs + "ms");
try {
Thread.sleep(delayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Operation interrupted", ie);
}
} catch (DocumentExistsException e) {
System.err.println("Document already exists: " + e.getMessage());
throw e;
} catch (DocumentNotFoundException e) {
System.err.println("Document not found: " + e.getMessage());
throw e;
} catch (CouchbaseException e) {
System.err.println("Couchbase operation failed: " + e.getMessage());
throw e;
}
}
throw new RuntimeException("Unexpected error in retry logic");
}
// Async retry mechanism
public static <T> CompletableFuture<T> retryAsync(Supplier<CompletableFuture<T>> operation, 
int maxRetries) {
CompletableFuture<T> result = new CompletableFuture<>();
retryAsyncInternal(operation, maxRetries, 0, result);
return result;
}
private static <T> void retryAsyncInternal(Supplier<CompletableFuture<T>> operation,
int maxRetries, int attempt,
CompletableFuture<T> result) {
operation.get().whenComplete((response, error) -> {
if (error == null) {
result.complete(response);
} else if (attempt < maxRetries && isRetryableError(error)) {
// Schedule retry with delay
long delayMs = (long) Math.pow(2, attempt) * 1000;
CompletableFuture.delayedExecutor(delayMs, java.util.concurrent.TimeUnit.MILLISECONDS)
.execute(() -> retryAsyncInternal(operation, maxRetries, attempt + 1, result));
} else {
result.completeExceptionally(error);
}
});
}
private static boolean isRetryableError(Throwable error) {
return error instanceof TemporaryFailureException ||
error instanceof TimeoutException ||
error instanceof ServiceNotAvailableException;
}
// Circuit breaker pattern
public static class CircuitBreaker {
private final int failureThreshold;
private final Duration timeout;
private int failureCount = 0;
private long lastFailureTime = 0;
private State state = State.CLOSED;
enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(int failureThreshold, Duration timeout) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
}
public <T> T execute(Supplier<T> operation) {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > timeout.toMillis()) {
state = State.HALF_OPEN;
} else {
throw new RuntimeException("Circuit breaker is OPEN");
}
}
try {
T result = operation.get();
if (state == State.HALF_OPEN) {
state = State.CLOSED;
failureCount = 0;
}
return result;
} catch (Exception e) {
handleFailure();
throw e;
}
}
private void handleFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
if (failureCount >= failureThreshold) {
state = State.OPEN;
}
}
}
}

Spring Boot Integration

Spring Configuration

import org.springframework.context.annotation.Configuration;
import org.springframework.data.couchbase.config.AbstractCouchbaseConfiguration;
import org.springframework.data.couchbase.repository.config.EnableCouchbaseRepositories;
import java.util.List;
@Configuration
@EnableCouchbaseRepositories(basePackages = "com.example.repository")
public class CouchbaseConfig extends AbstractCouchbaseConfiguration {
@Override
public String getConnectionString() {
return "couchbase://127.0.0.1";
}
@Override
public String getUserName() {
return "Administrator";
}
@Override
public String getPassword() {
return "password";
}
@Override
public String getBucketName() {
return "travel-sample";
}
@Override
protected boolean autoIndexCreation() {
return true;
}
}

Spring Data Repository

import org.springframework.data.couchbase.repository.CouchbaseRepository;
import org.springframework.data.couchbase.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Optional;
@Repository
public interface UserRepository extends CouchbaseRepository<User, String> {
// Derived query methods
Optional<User> findByUsername(String username);
List<User> findByEmail(String email);
List<User> findByAgeBetween(int minAge, int maxAge);
List<User> findByRolesContaining(String role);
List<User> findByAddressCity(String city);
// Custom N1QL query
@Query("#{#n1ql.selectEntity} WHERE #{#n1ql.filter} AND ANY role IN roles SATISFIES role = $1 END")
List<User> findUsersWithRole(String role);
@Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} AND age > $1")
long countUsersOlderThan(int age);
// Projection query
@Query("SELECT username, email, address.city FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} AND active = true")
List<UserProjection> findActiveUsersProjection();
// Update query
@Query("UPDATE #{#n1ql.bucket} SET active = false WHERE #{#n1ql.filter} AND lastLogin < $1")
void deactivateInactiveUsers(String cutoffDate);
}
// Projection interface
interface UserProjection {
String getUsername();
String getEmail();
String getCity();
}

Best Practices and Performance Optimization

import com.couchbase.client.java.*;
import com.couchbase.client.java.kv.*;
public class CouchbaseBestPractices {
// Connection pooling
public static class ConnectionPool {
private static final int MAX_CONNECTIONS = 10;
private static final List<Cluster> connections = new java.util.ArrayList<>();
public static synchronized Cluster getConnection() {
if (connections.size() < MAX_CONNECTIONS) {
Cluster cluster = Cluster.connect("couchbase://localhost", 
"username", "password");
connections.add(cluster);
return cluster;
}
// Implement connection pooling logic
return connections.get(0); // Simplified
}
}
// Batch operations for better performance
public static class BatchOperations {
private final Collection collection;
public BatchOperations(Collection collection) {
this.collection = collection;
}
public void bulkInsert(List<Pair<String, Object>> documents) {
// Use reactive API for better batch processing
collection.reactive()
.flux()
.fromIterable(documents)
.flatMap(pair -> collection.reactive().upsert(pair.getKey(), pair.getValue()))
.buffer(100) // Batch size
.blockLast(); // Wait for completion
}
}
// Document design best practices
public static class DocumentDesign {
// Use meaningful document IDs
public static String createDocumentId(String type, String identifier) {
return String.format("%s::%s", type, identifier);
}
// Avoid large documents
public static boolean isDocumentSizeReasonable(Object document) {
// Check document size logic
return true;
}
// Use appropriate data types
public static class OptimizedUser {
private String id;
private String username;
private String email;
private int age; // Use primitive types when possible
private List<String> roles; // Use arrays for multiple values
private Map<String, Object> metadata; // Flexible attributes
// Getters and setters
}
}
// Monitoring and metrics
public static class Monitoring {
public static void logOperationMetrics(String operation, long startTime) {
long duration = System.currentTimeMillis() - startTime;
System.out.printf("Operation %s took %d ms%n", operation, duration);
// Log to monitoring system
if (duration > 1000) { // Threshold for slow operations
System.err.println("SLOW OPERATION: " + operation);
}
}
public static void monitorClusterHealth(Cluster cluster) {
// Implement cluster health checks
try {
cluster.ping();
System.out.println("Cluster is healthy");
} catch (Exception e) {
System.err.println("Cluster health check failed: " + e.getMessage());
}
}
}
}
// Utility class
class Pair<K, V> {
private final K key;
private final V value;
public Pair(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() { return key; }
public V getValue() { return value; }
}

Conclusion

Couchbase Java SDK provides a powerful and flexible way to interact with Couchbase Server. Key best practices include:

  1. Connection Management: Use connection pooling and proper resource cleanup
  2. Document Design: Design documents for optimal read/write patterns
  3. Query Optimization: Use appropriate indexes and optimize N1QL queries
  4. Error Handling: Implement robust error handling and retry mechanisms
  5. Performance: Use reactive programming for better resource utilization
  6. Monitoring: Implement comprehensive monitoring and logging

The SDK's support for both synchronous and asynchronous operations, along with Spring Data integration, makes it suitable for a wide range of enterprise applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper