The traditional data lake, built on cloud storage with formats like Parquet, solved the problem of storing massive volumes of structured and unstructured data. However, it introduced new challenges: unreliable data quality, difficult updates, and a lack of ACID transactions. Delta Lake, an open-source storage layer, solves these problems by bringing reliability and performance to data lakes.
This article explores how to implement and interact with a Delta Lake using Java, moving beyond the typical Python/Scala focus to demonstrate a robust JVM-based data architecture.
What is Delta Lake?
Delta Lake is an open-source storage framework that runs on top of your existing data lake (e.g., AWS S3, Azure Data Lake Storage, GCP Cloud Storage). It provides:
- ACID Transactions: Guarantees data integrity with serializable isolation levels.
- Schema Enforcement & Evolution: Ensures data quality and safely handles schema changes.
- Time Travel: Query historical versions of data for audits, reproducibility, or rollbacks.
- Unified Batch & Streaming: Serves as both a batch table and a streaming source/sink.
- Upserts, Deletes, and Merges: Supports DML operations traditionally difficult in data lakes.
Core Architecture: How Delta Lake Works
Delta Lake achieves this through a transaction log (Delta Log) that records all changes made to the table. Each transaction creates a new JSON file in the _delta_log directory. When you query a Delta table, the system reads the transaction log to determine which data files are part of the current, consistent view.
s3://my-bucket/my-delta-table/ ├── part-00000-xxx.parquet ├── part-00001-xxx.parquet ├── part-00002-xxx.parquet └── _delta_log/ ├── 00000000000000000000.json ├── 00000000000000000001.json └── 00000000000000000002.json
Java Implementation with Delta Standalone Library
While Delta Lake is often used with Apache Spark, the Delta Standalone library allows Java applications to read and write Delta tables without a Spark dependency.
1. Maven Dependencies
<dependencies> <!-- Delta Standalone Reader/Writer --> <dependency> <groupId>io.delta</groupId> <artifactId>delta-standalone_2.12</artifactId> <version>0.6.0</version> </dependency> <!-- Parquet for data serialization --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.12.3</version> </dependency> <!-- AWS S3 support (if using S3) --> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.12.261</version> </dependency> <!-- Hadoop AWS for S3A filesystem --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>3.3.4</version> </dependency> </dependencies>
2. Configuration and Setup
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.*;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;
import org.apache.hadoop.conf.Configuration;
public class DeltaLakeService {
private final Configuration hadoopConf;
public DeltaLakeService() {
this.hadoopConf = new Configuration();
// Configure for S3
hadoopConf.set("fs.s3a.access.key", "your-access-key");
hadoopConf.set("fs.s3a.secret.key", "your-secret-key");
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com");
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
// Important for Delta Lake
hadoopConf.set("parquet.compression", "SNAPPY");
}
public DeltaLog getDeltaLog(String tablePath) {
return DeltaLog.forTable(hadoopConf, tablePath);
}
}
3. Creating a Delta Table
public class DeltaTableCreator {
public void createDeltaTable(String tablePath, StructType schema) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
// Check if table already exists
if (deltaLog.snapshot().getVersion() < 0) {
// Create initial transaction
OptimisticTransaction txn = deltaLog.startTransaction();
// Define schema
txn.updateMetadata(
Metadata.builder()
.schema(schema)
.build()
);
// Create initial, empty version
txn.commit(Collections.emptyList(), new Operation(Operation.Name.CREATE_TABLE));
System.out.println("Delta table created at: " + tablePath);
}
}
// Example schema definition
public StructType createUserSchema() {
return new StructType(new StructField[]{
new StructField("user_id", DataTypes.LONG, false, new HashMap<>()),
new StructField("username", DataTypes.STRING, false, new HashMap<>()),
new StructField("email", DataTypes.STRING, true, new HashMap<>()),
new StructField("created_at", DataTypes.TIMESTAMP, false, new HashMap<>()),
new StructField("last_login", DataTypes.TIMESTAMP, true, new HashMap<>())
});
}
}
4. Writing Data to Delta Table
public class DeltaDataWriter {
public void appendData(String tablePath, List<User> users) throws IOException {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
OptimisticTransaction txn = deltaLog.startTransaction();
// Create Parquet file with data
Path dataPath = new Path(tablePath, "data-" + UUID.randomUUID() + ".parquet");
FileSystem fs = dataPath.getFileSystem(hadoopConf);
try (ParquetWriter<User> writer = createParquetWriter(fs, dataPath)) {
for (User user : users) {
writer.write(user);
}
}
// Prepare transaction actions
List<Action> actions = new ArrayList<>();
// Add file action
Map<String, String> stats = new HashMap<>();
stats.put("numRecords", String.valueOf(users.size()));
actions.add(new AddFile(
dataPath.getName(),
Collections.emptyMap(),
dataPath.getParent().toString().length(),
System.currentTimeMillis(),
true,
null,
stats
));
// Commit transaction
txn.commit(actions, new Operation(Operation.Name.WRITE));
System.out.println("Successfully appended " + users.size() + " records");
}
private ParquetWriter<User> createParquetWriter(FileSystem fs, Path path) throws IOException {
// Implementation depends on your Parquet serialization strategy
// This could use Avro, Protobuf, or simple Parquet writers
return null; // Placeholder
}
}
5. Reading from Delta Table
public class DeltaDataReader {
public List<User> readDeltaTable(String tablePath) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
Snapshot snapshot = deltaLog.snapshot();
List<User> results = new ArrayList<>();
try (CloseableIterator<RowRecord> iter = snapshot.open()) {
while (iter.hasNext()) {
RowRecord record = iter.next();
User user = mapRowToUser(record);
results.add(user);
}
}
return results;
}
public List<User> timeTravelRead(String tablePath, long version) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
Snapshot snapshot = deltaLog.getSnapshotForVersionAsOf(version);
List<User> results = new ArrayList<>();
try (CloseableIterator<RowRecord> iter = snapshot.open()) {
while (iter.hasNext()) {
RowRecord record = iter.next();
results.add(mapRowToUser(record));
}
}
return results;
}
private User mapRowToUser(RowRecord record) {
return new User(
record.getLong("user_id"),
record.getString("username"),
record.isNullAt("email") ? null : record.getString("email"),
record.getTimestamp("created_at"),
record.isNullAt("last_login") ? null : record.getTimestamp("last_login")
);
}
}
6. Performing UPSERT Operations with Merge
public class DeltaMergeOperation {
public void mergeUsers(String tablePath, List<User> newUsers) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
// This is a simplified conceptual example
// Actual merge implementation requires more complex file management
for (User newUser : newUsers) {
OptimisticTransaction txn = deltaLog.startTransaction();
// In practice, you would:
// 1. Scan existing data to find matches
// 2. Create new files with updated records
// 3. Mark old files as removed
// 4. Commit the transaction
List<Action> actions = prepareMergeActions(newUser, txn);
txn.commit(actions, new Operation(Operation.Name.MERGE));
}
}
private List<Action> prepareMergeActions(User user, OptimisticTransaction txn) {
// Implementation would handle:
// - Finding existing records to update
// - Creating new AddFile actions
// - Creating RemoveFile actions for old data
return new ArrayList<>();
}
}
Advanced Features in Java
1. Schema Evolution
public class SchemaManager {
public void evolveSchema(String tablePath, StructType newSchema) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
OptimisticTransaction txn = deltaLog.startTransaction();
txn.updateMetadata(
Metadata.builder()
.schema(newSchema)
.build()
);
txn.commit(Collections.emptyList(), new Operation(Operation.Name.UPDATE));
}
}
2. Table Maintenance
public class TableMaintenance {
public void runVacuum(String tablePath, long retentionHours) {
DeltaLog deltaLog = DeltaLog.forTable(hadoopConf, tablePath);
// Remove files older than retention period
deltaLog.vacuum(retentionHours);
}
public void compactTable(String tablePath) {
// Implement file compaction to optimize small files
// This improves query performance
}
}
Best Practices for Java Implementation
- Connection Pooling: Reuse DeltaLog instances where possible.
- Error Handling: Implement retry logic for concurrent modifications.
- Monitoring: Track table versions, file counts, and transaction metrics.
- Partitioning: Use appropriate partitioning for large tables.
- File Sizing: Optimize Parquet file sizes (100MB-1GB range).
Integration with Java Ecosystem
- Spring Boot: Create
@Servicecomponents for Delta operations. - Micrometer: Export Delta Lake metrics to your monitoring system.
- Quarkus/Spring Native: Build native images for serverless deployments.
@Service
public class UserAnalyticsService {
@Autowired
private DeltaLakeService deltaLake;
@Value("${delta.table.users.path}")
private String usersTablePath;
public List<User> getRecentUsers(LocalDateTime since) {
return deltaLake.readDeltaTable(usersTablePath).stream()
.filter(user -> user.getLastLogin().isAfter(since))
.collect(Collectors.toList());
}
}
Conclusion
Implementing Delta Lake with Java provides a robust foundation for building reliable, high-performance data lakes. The Delta Standalone library enables Java applications to leverage Delta Lake's powerful features without Spark dependencies, making it ideal for:
- Microservices that need reliable data storage
- ETL pipelines built on Java frameworks
- Data applications requiring ACID guarantees
- Stream processing with exactly-once semantics
While the Java API is less mature than the Scala/PySpark versions, it offers enterprise-grade reliability for JVM-based data architectures. By implementing Delta Lake with Java, you can build data lakes that are not just storage repositories but trustworthy, queryable, and manageable data assets.
The key to success lies in proper configuration, understanding the transaction model, and implementing robust error handling for concurrent operations. With these elements in place, Delta Lake in Java becomes a powerful tool for modern data engineering.