Article
Apache Cassandra is a highly scalable, distributed NoSQL database designed to handle large amounts of data across many commodity servers. The DataStax Java Driver is the official client for connecting to Cassandra databases from Java applications. This comprehensive guide covers Cassandra driver usage, from basic operations to advanced features.
Cassandra Architecture Overview
Key Concepts:
- Cluster: Group of nodes storing your data
- Keyspace: Namespace for tables (similar to database in RDBMS)
- Table: Collection of rows with defined schema
- Partition Key: Determines data distribution across nodes
- Clustering Columns: Determines sort order within partitions
Data Model Characteristics:
- Denormalized: Optimized for read performance
- Partition-Oriented: Data organized by partition key
- Eventually Consistent: Tunable consistency levels
Project Setup and Dependencies
Maven Dependencies
<properties>
<cassandra-driver.version>4.15.0</cassandra-driver.version>
<dropwizard-metrics.version>4.2.10</dropwizard-metrics.version>
</properties>
<dependencies>
<!-- DataStax Java Driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${cassandra-driver.version}</version>
</dependency>
<!-- Query Builder -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>${cassandra-driver.version}</version>
</dependency>
<!-- Mapper (Optional) -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>${cassandra-driver.version}</version>
</dependency>
<!-- Metrics (Optional) -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-metrics-micrometer</artifactId>
<version>${cassandra-driver.version}</version>
</dependency>
<!-- Micrometer (for metrics) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
Gradle Dependencies
dependencies {
implementation 'com.datastax.oss:java-driver-core:4.15.0'
implementation 'com.datastax.oss:java-driver-query-builder:4.15.0'
implementation 'com.datastax.oss:java-driver-mapper-runtime:4.15.0'
implementation 'com.datastax.oss:java-driver-metrics-micrometer:4.15.0'
implementation 'io.micrometer:micrometer-core:1.10.0'
}
Basic Cassandra Driver Usage
1. Session Management and Connection Pool
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import java.net.InetSocketAddress;
import java.time.Duration;
public class CassandraSessionManager {
private CqlSession session;
// Basic session creation
public CqlSession createSimpleSession(String contactPoint, int port,
String datacenter, String keyspace) {
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(contactPoint, port))
.withLocalDatacenter(datacenter)
.withKeyspace(keyspace)
.build();
}
// Advanced session with configuration
public CqlSession createConfiguredSession(String contactPoint, int port,
String datacenter, String keyspace) {
DriverConfigLoader configLoader = DriverConfigLoader.programmaticBuilder()
.withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, datacenter)
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 3)
.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 1)
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(10))
.withBoolean(DefaultDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, true)
.build();
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(contactPoint, port))
.withConfigLoader(configLoader)
.withKeyspace(keyspace)
.build();
}
// Session with multiple contact points
public CqlSession createMultiNodeSession(List<String> contactPoints, int port,
String datacenter, String keyspace) {
CqlSessionBuilder builder = CqlSession.builder()
.withLocalDatacenter(datacenter)
.withKeyspace(keyspace);
for (String contactPoint : contactPoints) {
builder.addContactPoint(new InetSocketAddress(contactPoint, port));
}
return builder.build();
}
public void closeSession() {
if (session != null && !session.isClosed()) {
session.close();
}
}
// Health check
public boolean isHealthy() {
if (session == null || session.isClosed()) {
return false;
}
try {
ResultSet result = session.execute("SELECT release_version FROM system.local");
return result.one() != null;
} catch (Exception e) {
return false;
}
}
}
2. Basic CRUD Operations
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import java.util.UUID;
public class CassandraCRUDOperations {
private final CqlSession session;
public CassandraCRUDOperations(CqlSession session) {
this.session = session;
}
// Create Keyspace
public void createKeyspace(String keyspaceName, int replicationFactor) {
CreateKeyspace createKeyspace = SchemaBuilder.createKeyspace(keyspaceName)
.ifNotExists()
.withSimpleStrategy(replicationFactor);
session.execute(createKeyspace.build());
System.out.println("Keyspace created: " + keyspaceName);
}
// Create Table
public void createUserTable() {
CreateTable createTable = SchemaBuilder.createTable("users")
.ifNotExists()
.withPartitionKey("user_id", DataTypes.UUID)
.withColumn("email", DataTypes.TEXT)
.withColumn("first_name", DataTypes.TEXT)
.withColumn("last_name", DataTypes.TEXT)
.withColumn("age", DataTypes.INT)
.withColumn("created_at", DataTypes.TIMESTAMP)
.withColumn("is_active", DataTypes.BOOLEAN);
session.execute(createTable.build());
System.out.println("Table 'users' created");
}
// INSERT operation
public void insertUser(UUID userId, String email, String firstName,
String lastName, int age, boolean isActive) {
String query = "INSERT INTO users (user_id, email, first_name, last_name, age, created_at, is_active) " +
"VALUES (?, ?, ?, ?, ?, toTimestamp(now()), ?)";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId, email, firstName, lastName, age, isActive);
session.execute(bound);
System.out.println("User inserted: " + email);
}
// SELECT operation
public User getUserById(UUID userId) {
String query = "SELECT * FROM users WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId);
ResultSet resultSet = session.execute(bound);
Row row = resultSet.one();
if (row != null) {
return new User(
row.getUuid("user_id"),
row.getString("email"),
row.getString("first_name"),
row.getString("last_name"),
row.getInt("age"),
row.getInstant("created_at"),
row.getBoolean("is_active")
);
}
return null;
}
// UPDATE operation
public void updateUserEmail(UUID userId, String newEmail) {
String query = "UPDATE users SET email = ? WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(newEmail, userId);
ResultSet resultSet = session.execute(bound);
System.out.println("User email updated: " + userId);
}
// DELETE operation
public void deleteUser(UUID userId) {
String query = "DELETE FROM users WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId);
session.execute(bound);
System.out.println("User deleted: " + userId);
}
// Batch operations
public void batchInsertUsers(List<User> users) {
BatchStatementBuilder batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED);
String insertQuery = "INSERT INTO users (user_id, email, first_name, last_name, age, created_at, is_active) " +
"VALUES (?, ?, ?, ?, ?, toTimestamp(now()), ?)";
PreparedStatement prepared = session.prepare(insertQuery);
for (User user : users) {
BoundStatement bound = prepared.bind(
user.getUserId(), user.getEmail(), user.getFirstName(),
user.getLastName(), user.getAge(), user.isActive()
);
batchBuilder.addStatement(bound);
}
BatchStatement batch = batchBuilder.build();
session.execute(batch);
System.out.println("Batch insert completed for " + users.size() + " users");
}
}
// User entity class
class User {
private UUID userId;
private String email;
private String firstName;
private String lastName;
private int age;
private Instant createdAt;
private boolean isActive;
public User() {}
public User(UUID userId, String email, String firstName, String lastName,
int age, Instant createdAt, boolean isActive) {
this.userId = userId;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.createdAt = createdAt;
this.isActive = isActive;
}
// Getters and setters
public UUID getUserId() { return userId; }
public void setUserId(UUID userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public Instant getCreatedAt() { return createdAt; }
public void setCreatedAt(Instant createdAt) { this.createdAt = createdAt; }
public boolean isActive() { return isActive; }
public void setActive(boolean active) { isActive = active; }
}
Advanced Query Operations
1. Pagination and Result Processing
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.paging.OffsetPager;
import com.datastax.oss.driver.api.core.paging.OffsetPager.Page;
import java.util.List;
import java.util.stream.Collectors;
public class PaginationOperations {
private final CqlSession session;
public PaginationOperations(CqlSession session) {
this.session = session;
}
// Simple pagination with LIMIT
public List<User> getUsersWithLimit(int limit) {
String query = "SELECT * FROM users LIMIT ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(limit);
ResultSet resultSet = session.execute(bound);
return resultSet.all().stream()
.map(this::mapRowToUser)
.collect(Collectors.toList());
}
// Offset-based pagination
public List<User> getUsersWithPaging(int pageSize, int pageNumber) {
String query = "SELECT * FROM users";
Statement<?> statement = SimpleStatement.newInstance(query)
.setPageSize(pageSize);
ResultSet resultSet = session.execute(statement);
// Skip to the desired page
int rowsToSkip = pageSize * (pageNumber - 1);
OffsetPager pager = new OffsetPager(pageSize);
Page<Row> page = pager.getPage(resultSet, rowsToSkip);
return page.getElements().stream()
.map(this::mapRowToUser)
.collect(Collectors.toList());
}
// Token-based pagination (for large datasets)
public PagedResult<User> getUsersWithTokenPaging(int pageSize, String pagingState) {
String query = "SELECT * FROM users";
SimpleStatement statement = SimpleStatement.newInstance(query)
.setPageSize(pageSize);
if (pagingState != null) {
statement = statement.setPagingState(PagingState.fromString(pagingState));
}
ResultSet resultSet = session.execute(statement);
List<User> users = resultSet.currentPage().stream()
.map(this::mapRowToUser)
.collect(Collectors.toList());
String nextPageState = resultSet.getExecutionInfo().getPagingState() != null ?
resultSet.getExecutionInfo().getPagingState().toString() : null;
return new PagedResult<>(users, nextPageState);
}
// Async pagination
public CompletableFuture<PagedResult<User>> getUsersAsync(int pageSize, String pagingState) {
String query = "SELECT * FROM users";
SimpleStatement statement = SimpleStatement.newInstance(query)
.setPageSize(pageSize);
if (pagingState != null) {
statement = statement.setPagingState(PagingState.fromString(pagingState));
}
return session.executeAsync(statement)
.thenApply(resultSet -> {
List<User> users = resultSet.currentPage().stream()
.map(this::mapRowToUser)
.collect(Collectors.toList());
String nextPageState = resultSet.getExecutionInfo().getPagingState() != null ?
resultSet.getExecutionInfo().getPagingState().toString() : null;
return new PagedResult<>(users, nextPageState);
});
}
private User mapRowToUser(Row row) {
return new User(
row.getUuid("user_id"),
row.getString("email"),
row.getString("first_name"),
row.getString("last_name"),
row.getInt("age"),
row.getInstant("created_at"),
row.getBoolean("is_active")
);
}
}
// Paged result wrapper
class PagedResult<T> {
private final List<T> items;
private final String nextPageState;
public PagedResult(List<T> items, String nextPageState) {
this.items = items;
this.nextPageState = nextPageState;
}
public List<T> getItems() { return items; }
public String getNextPageState() { return nextPageState; }
public boolean hasMorePages() { return nextPageState != null; }
}
2. Advanced Data Types and Collections
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import com.datastax.oss.driver.api.querybuilder.schema.CreateType;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AdvancedDataTypesOperations {
private final CqlSession session;
public AdvancedDataTypesOperations(CqlSession session) {
this.session = session;
}
// Create UDT (User Defined Type)
public void createAddressType() {
CreateType createType = SchemaBuilder.createType("address")
.ifNotExists()
.withField("street", DataTypes.TEXT)
.withField("city", DataTypes.TEXT)
.withField("state", DataTypes.TEXT)
.withField("zip_code", DataTypes.TEXT)
.withField("country", DataTypes.TEXT);
session.execute(createType.build());
System.out.println("UDT 'address' created");
}
// Create table with collections and UDT
public void createUserProfileTable() {
CreateTable createTable = SchemaBuilder.createTable("user_profiles")
.ifNotExists()
.withPartitionKey("user_id", DataTypes.UUID)
.withColumn("email", DataTypes.TEXT)
.withColumn("addresses", DataTypes.setOf(DataTypes.frozen("address")))
.withColumn("phone_numbers", DataTypes.listOf(DataTypes.TEXT))
.withColumn("preferences", DataTypes.mapOf(DataTypes.TEXT, DataTypes.TEXT))
.withColumn("coordinates", DataTypes.tupleOf(DataTypes.DOUBLE, DataTypes.DOUBLE));
session.execute(createTable.build());
System.out.println("Table 'user_profiles' created");
}
// Insert with collections and UDT
public void insertUserProfile(UUID userId, String email,
Set<UdtValue> addresses,
List<String> phoneNumbers,
Map<String, String> preferences,
TupleValue coordinates) {
String query = "INSERT INTO user_profiles (user_id, email, addresses, phone_numbers, preferences, coordinates) " +
"VALUES (?, ?, ?, ?, ?, ?)";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId, email, addresses, phoneNumbers, preferences, coordinates);
session.execute(bound);
System.out.println("User profile inserted: " + email);
}
// Read collections and UDT
public UserProfile getUserProfile(UUID userId) {
String query = "SELECT * FROM user_profiles WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId);
ResultSet resultSet = session.execute(bound);
Row row = resultSet.one();
if (row != null) {
UserProfile profile = new UserProfile();
profile.setUserId(row.getUuid("user_id"));
profile.setEmail(row.getString("email"));
profile.setAddresses(row.getSet("addresses", UdtValue.class));
profile.setPhoneNumbers(row.getList("phone_numbers", String.class));
profile.setPreferences(row.getMap("preferences", String.class, String.class));
profile.setCoordinates(row.getTupleValue("coordinates"));
return profile;
}
return null;
}
// Update collections
public void addUserAddress(UUID userId, UdtValue newAddress) {
String query = "UPDATE user_profiles SET addresses = addresses + ? WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(Set.of(newAddress), userId);
session.execute(bound);
System.out.println("Address added for user: " + userId);
}
// Create UDT value
public UdtValue createAddress(String street, String city, String state, String zipCode, String country) {
return session.getMetadata()
.getKeyspace(session.getKeyspace().get())
.flatMap(ks -> ks.getUserDefinedType("address"))
.map(udt -> udt.newValue()
.setString("street", street)
.setString("city", city)
.setString("state", state)
.setString("zip_code", zipCode)
.setString("country", country))
.orElseThrow(() -> new IllegalStateException("UDT 'address' not found"));
}
// Create tuple value
public TupleValue createCoordinates(double latitude, double longitude) {
return session.getMetadata()
.getKeyspace(session.getKeyspace().get())
.flatMap(ks -> ks.getTupleType(DataTypes.DOUBLE, DataTypes.DOUBLE))
.map(tupleType -> tupleType.newValue(latitude, longitude))
.orElseThrow(() -> new IllegalStateException("Tuple type not found"));
}
}
// User profile with advanced types
class UserProfile {
private UUID userId;
private String email;
private Set<UdtValue> addresses;
private List<String> phoneNumbers;
private Map<String, String> preferences;
private TupleValue coordinates;
// Getters and setters
public UUID getUserId() { return userId; }
public void setUserId(UUID userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Set<UdtValue> getAddresses() { return addresses; }
public void setAddresses(Set<UdtValue> addresses) { this.addresses = addresses; }
public List<String> getPhoneNumbers() { return phoneNumbers; }
public void setPhoneNumbers(List<String> phoneNumbers) { this.phoneNumbers = phoneNumbers; }
public Map<String, String> getPreferences() { return preferences; }
public void setPreferences(Map<String, String> preferences) { this.preferences = preferences; }
public TupleValue getCoordinates() { return coordinates; }
public void setCoordinates(TupleValue coordinates) { this.coordinates = coordinates; }
}
DataStax Mapper (Object Mapping)
1. Entity Mapping with Annotations
import com.datastax.oss.driver.api.mapper.annotations.*;
import java.time.Instant;
import java.util.Set;
import java.util.UUID;
@Entity
@CqlName("users")
public class UserEntity {
@PartitionKey
@CqlName("user_id")
private UUID userId;
@CqlName("email")
private String email;
@CqlName("first_name")
private String firstName;
@CqlName("last_name")
private String lastName;
@CqlName("age")
private Integer age;
@CqlName("created_at")
private Instant createdAt;
@CqlName("is_active")
private Boolean active;
// Default constructor
public UserEntity() {}
// All-args constructor
public UserEntity(UUID userId, String email, String firstName, String lastName,
Integer age, Instant createdAt, Boolean active) {
this.userId = userId;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.createdAt = createdAt;
this.active = active;
}
// Getters and setters
public UUID getUserId() { return userId; }
public void setUserId(UUID userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
public Instant getCreatedAt() { return createdAt; }
public void setCreatedAt(Instant createdAt) { this.createdAt = createdAt; }
public Boolean getActive() { return active; }
public void setActive(Boolean active) { this.active = active; }
}
2. DAO Interface
import com.datastax.oss.driver.api.mapper.annotations.*;
import java.util.UUID;
@Dao
public interface UserDao {
// CREATE
@Insert
void save(UserEntity user);
@Insert
void saveAsync(UserEntity user);
// READ
@Select
UserEntity findById(UUID userId);
@Query("SELECT * FROM ${qualifiedTableId} WHERE email = :email")
UserEntity findByEmail(String email);
@Query("SELECT * FROM ${qualifiedTableId} WHERE is_active = true")
PagingIterable<UserEntity> findActiveUsers();
@Query("SELECT * FROM ${qualifiedTableId} WHERE age > :minAge ALLOW FILTERING")
PagingIterable<UserEntity> findUsersOlderThan(int minAge);
// UPDATE
@Update
void update(UserEntity user);
@Query("UPDATE ${qualifiedTableId} SET email = :newEmail WHERE user_id = :userId")
void updateEmail(UUID userId, String newEmail);
// DELETE
@Delete
void delete(UserEntity user);
@Delete(entityClass = UserEntity.class)
void deleteById(UUID userId);
@Query("DELETE FROM ${qualifiedTableId} WHERE user_id = :userId")
void deleteByIdAsync(UUID userId);
}
3. Mapper Configuration and Usage
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.mapper.MapperBuilder;
import com.datastax.oss.driver.api.mapper.annotations.DaoFactory;
import com.datastax.oss.driver.api.mapper.annotations.Mapper;
@Mapper
public interface UserMapper {
@DaoFactory
UserDao userDao();
static UserMapper builder(CqlSession session) {
MapperBuilder<UserMapper> builder = new UserMapperBuilder(session);
return builder.build();
}
}
// Usage example
public class MapperExample {
private final UserDao userDao;
public MapperExample(CqlSession session) {
UserMapper mapper = UserMapper.builder(session);
this.userDao = mapper.userDao();
}
public void demonstrateMapperOperations() {
// Create user
UserEntity user = new UserEntity(
UUID.randomUUID(),
"[email protected]",
"John", "Doe", 30,
Instant.now(), true
);
userDao.save(user);
// Read user
UserEntity foundUser = userDao.findById(user.getUserId());
System.out.println("Found user: " + foundUser.getEmail());
// Update user
user.setEmail("[email protected]");
userDao.update(user);
// Find by email
UserEntity byEmail = userDao.findByEmail("[email protected]");
System.out.println("User by email: " + byEmail.getFirstName());
// Find active users
PagingIterable<UserEntity> activeUsers = userDao.findActiveUsers();
activeUsers.forEach(activeUser ->
System.out.println("Active user: " + activeUser.getEmail())
);
// Delete user
userDao.deleteById(user.getUserId());
}
}
Performance Optimization and Monitoring
1. Connection Pool Configuration
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import java.time.Duration;
public class PerformanceConfiguration {
public static DriverConfigLoader createOptimizedConfig() {
return DriverConfigLoader.programmaticBuilder()
// Connection pool settings
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 4)
.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 2)
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(10))
// Request settings
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(10))
.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000)
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_QUORUM")
.withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE, true)
// Reconnection policy
.withClass(DefaultDriverOption.RECONNECTION_POLICY_CLASS,
com.datastax.oss.driver.api.core.connection.ExponentialReconnectionPolicy.class)
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1))
.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofSeconds(60))
// Retry policy
.withClass(DefaultDriverOption.RETRY_POLICY_CLASS,
com.datastax.oss.driver.api.core.retry.DefaultRetryPolicy.class)
// Metrics
.withString(DefaultDriverOption.METRICS_FACTORY_CLASS,
"MicrometerMetricsFactory")
.withBoolean(DefaultDriverOption.METRICS_NODE_ENABLED, true)
.withBoolean(DefaultDriverOption.METRICS_SESSION_ENABLED, true)
.build();
}
}
2. Metrics and Monitoring
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
public class CassandraMetrics {
private final CqlSession session;
private final MeterRegistry meterRegistry;
public CassandraMetrics(CqlSession session) {
this.session = session;
this.meterRegistry = new SimpleMeterRegistry();
setupMetrics();
}
private void setupMetrics() {
// Session metrics
session.getMetrics().ifPresent(metrics -> {
metrics.getSessionMetric(DefaultSessionMetric.CONNECTED_NODES)
.ifPresent(gauge -> gauge.addListener((id, value) ->
meterRegistry.gauge("cassandra.connected_nodes", value)));
metrics.getSessionMetric(DefaultSessionMetric.REQUEST_TIMEOUTS)
.ifPresent(counter -> counter.addListener((id, value) ->
meterRegistry.counter("cassandra.request_timeouts").increment(value)));
});
// Node metrics
session.getMetrics().ifPresent(metrics -> {
metrics.getNodeMetric(DefaultNodeMetric.OPEN_CONNECTIONS)
.ifPresent(gauge -> gauge.addListener((id, value) ->
meterRegistry.gauge("cassandra.node.open_connections",
tags("node", id.toString()), value)));
});
}
public void printMetrics() {
System.out.println("=== Cassandra Metrics ===");
session.getMetrics().ifPresent(metrics -> {
metrics.getSessionMetric(DefaultSessionMetric.CONNECTED_NODES)
.ifPresent(gauge -> System.out.println("Connected nodes: " + gauge.getValue()));
metrics.getSessionMetric(DefaultSessionMetric.BYTES_SENT)
.ifPresent(counter -> System.out.println("Bytes sent: " + counter.getValue()));
metrics.getSessionMetric(DefaultSessionMetric.BYTES_RECEIVED)
.ifPresent(counter -> System.out.println("Bytes received: " + counter.getValue()));
});
}
private io.micrometer.core.instrument.Tags tags(String... keyValues) {
return io.micrometer.core.instrument.Tags.of(keyValues);
}
}
Error Handling and Resilience
1. Comprehensive Error Handling
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.auth.AuthenticationException;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.api.core.servererrors.*;
import java.util.concurrent.CompletionException;
public class ErrorHandler {
public <T> T executeWithRetry(CassandraOperation<T> operation, int maxRetries) {
int attempts = 0;
while (attempts < maxRetries) {
try {
return operation.execute();
} catch (QueryValidationException e) {
// Don't retry validation errors
throw new CassandraOperationException("Query validation failed", e);
} catch (ReadTimeoutException | WriteTimeoutException e) {
// Retry timeout errors
attempts++;
if (attempts >= maxRetries) {
throw new CassandraOperationException("Operation timeout after " + maxRetries + " retries", e);
}
waitBeforeRetry(attempts);
} catch (UnavailableException e) {
// Retry unavailable errors
attempts++;
if (attempts >= maxRetries) {
throw new CassandraOperationException("Service unavailable after " + maxRetries + " retries", e);
}
waitBeforeRetry(attempts);
} catch (DriverTimeoutException e) {
// Retry driver timeout
attempts++;
if (attempts >= maxRetries) {
throw new CassandraOperationException("Driver timeout after " + maxRetries + " retries", e);
}
waitBeforeRetry(attempts);
} catch (AllNodesFailedException e) {
// All nodes failed - serious issue
throw new CassandraOperationException("All Cassandra nodes are unavailable", e);
} catch (AuthenticationException e) {
// Authentication failed - don't retry
throw new CassandraOperationException("Authentication failed", e);
} catch (Exception e) {
// Unexpected error
throw new CassandraOperationException("Unexpected error during Cassandra operation", e);
}
}
throw new CassandraOperationException("Max retries exceeded");
}
private void waitBeforeRetry(int attempt) {
try {
long delay = Math.min(1000 * (long) Math.pow(2, attempt), 30000); // Exponential backoff, max 30s
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CassandraOperationException("Operation interrupted during retry", e);
}
}
@FunctionalInterface
public interface CassandraOperation<T> {
T execute();
}
}
class CassandraOperationException extends RuntimeException {
public CassandraOperationException(String message) {
super(message);
}
public CassandraOperationException(String message, Throwable cause) {
super(message, cause);
}
}
Best Practices
- Connection Management: Use connection pooling and proper session lifecycle
- Prepared Statements: Always use prepared statements for repeated queries
- Batch Operations: Use batches judiciously (avoid large batches)
- Pagination: Implement proper pagination for large result sets
- Error Handling: Implement comprehensive error handling and retry logic
- Monitoring: Track metrics for performance and troubleshooting
- Data Modeling: Design tables based on query patterns
- Consistency Levels: Choose appropriate consistency levels for your use case
Conclusion
The DataStax Java Driver provides a powerful, feature-rich interface for working with Cassandra databases in Java applications. By leveraging prepared statements, object mapping, connection pooling, and comprehensive monitoring, you can build high-performance, scalable applications that effectively utilize Cassandra's distributed architecture. The driver's async capabilities, pagination support, and robust error handling make it suitable for production applications requiring high availability and performance.