Unlocking Real-Time Data Lakes: Apache Hudi for Upserts in Java

Traditional data lakes have excelled at handling massive volumes of append-only data but struggled with updates and deletes—operations critical for modern use cases like GDPR compliance, change data capture (CDC), and real-time analytics. Apache Hudi (Hadoop Upserts Deletes and Incrementals) solves this problem by bringing database-like upsert capabilities to data lakes while maintaining scalability and efficiency.


What is Apache Hudi?

Apache Hudi is an open-source data management framework that enables transactional capabilities (ACID compliance) on data lakes. It provides:

  • Upsert Support: Insert new records or update existing ones
  • Incremental Processing: Process only changed data
  • ACID Transactions: Ensure data consistency
  • Schema Evolution: Handle evolving data schemas
  • Multiple Query Engines: Integration with Spark, Flink, Presto, Trino, and Hive

Core Hudi Concepts

  1. Table Types:
  • Copy on Write (CoW): Updates create new file versions (better for read-heavy workloads)
  • Merge on Read (MoR): Updates go to delta logs merged on read (better for write-heavy workloads)
  1. Query Types:
  • Snapshot Queries: Latest committed data
  • Incremental Queries: Only changes since last commit
  • Read Optimized Queries: Latest committed data without merge (MoR only)
  1. Timeline: Hudi maintains a timeline of all table operations for transactional consistency.

Hudi Architecture for Upserts

[Data Source] → [Ingestion Job] → [Hudi Table] → [Query Engine]
|               |                |               |
Kafka          Spark/Flink     Parquet +      Spark/Presto/
JDBC           Java App        .hoodie files  Hive/Trino

Hudi stores data in Parquet files (base data) and Avro log files (updates in MoR), with metadata in .hoodie directory for transaction management.


Hands-On Tutorial: Implementing Hudi Upserts with Java and Spark

Let's build a complete example showing how to perform upserts on a data lake using Hudi, Java, and Apache Spark.

Step 1: Project Setup

Maven Dependencies (pom.xml):

<properties>
<spark.version>3.4.0</spark.version>
<hudi.version>0.13.1</hudi.version>
</properties>
<dependencies>
<!-- Apache Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Apache Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3.4-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<!-- AWS SDK for S3 (if using cloud storage) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.20.0</version>
</dependency>
</dependencies>

Step 2: Data Model

Let's model a user profile table that needs frequent updates:

public class UserProfile {
private String userId;
private String firstName;
private String lastName;
private String email;
private String lastLogin;
private int loginCount;
private String updatedAt;
// Constructors
public UserProfile() {}
public UserProfile(String userId, String firstName, String lastName, 
String email, String lastLogin, int loginCount, String updatedAt) {
this.userId = userId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.lastLogin = lastLogin;
this.loginCount = loginCount;
this.updatedAt = updatedAt;
}
// Getters and Setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastLogin = lastName; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getLastLogin() { return lastLogin; }
public void setLastLogin(String lastLogin) { this.lastLogin = lastLogin; }
public int getLoginCount() { return loginCount; }
public void setLoginCount(int loginCount) { this.loginCount = loginCount; }
public String getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(String updatedAt) { this.updatedAt = updatedAt; }
}

Step 3: Hudi Table Manager

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.HashMap;
import java.util.Map;
public class HudiTableManager {
private final SparkSession sparkSession;
private final String basePath;
private final String tableName;
public HudiTableManager(String basePath, String tableName) {
this.basePath = basePath;
this.tableName = tableName;
this.sparkSession = SparkSession.builder()
.appName("HudiUpsertExample")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate();
}
/**
* Create initial Hudi table with some sample data
*/
public void createInitialTable() {
// Sample initial data
String[][] initialData = {
{"user1", "John", "Doe", "[email protected]", "2024-01-15 10:30:00", "5", "2024-01-15 10:30:00"},
{"user2", "Jane", "Smith", "[email protected]", "2024-01-15 11:45:00", "3", "2024-01-15 11:45:00"},
{"user3", "Bob", "Johnson", "[email protected]", "2024-01-15 09:15:00", "8", "2024-01-15 09:15:00"}
};
// Create DataFrame from sample data
Dataset<Row> df = sparkSession.createDataFrame(
java.util.Arrays.asList(
new UserProfile(initialData[0][0], initialData[0][1], initialData[0][2], 
initialData[0][3], initialData[0][4], Integer.parseInt(initialData[0][5]), initialData[0][6]),
new UserProfile(initialData[1][0], initialData[1][1], initialData[1][2], 
initialData[1][3], initialData[1][4], Integer.parseInt(initialData[1][5]), initialData[1][6]),
new UserProfile(initialData[2][0], initialData[2][1], initialData[2][2], 
initialData[2][3], initialData[2][4], Integer.parseInt(initialData[2][5]), initialData[2][6])
),
UserProfile.class
);
// Write initial data as Hudi table
writeToHudiTable(df, "insert");
System.out.println("Initial Hudi table created successfully!");
}
/**
* Perform upsert operations on Hudi table
*/
public void upsertData(Dataset<Row> upsertDf) {
writeToHudiTable(upsertDf, "upsert");
System.out.println("Upsert operation completed successfully!");
}
/**
* Perform delete operations on Hudi table
*/
public void deleteData(Dataset<Row> deleteDf) {
Map<String, String> options = getBaseHudiOptions();
options.put(DataSourceWriteOptions.OPERATION().key(), "delete");
deleteDf.write()
.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath + "/" + tableName);
System.out.println("Delete operation completed successfully!");
}
/**
* Common method to write data to Hudi table
*/
private void writeToHudiTable(Dataset<Row> df, String operation) {
Map<String, String> options = getBaseHudiOptions();
options.put(DataSourceWriteOptions.OPERATION().key(), operation);
df.write()
.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath + "/" + tableName);
}
/**
* Base Hudi configuration options
*/
private Map<String, String> getBaseHudiOptions() {
Map<String, String> options = new HashMap<>();
// Table configuration
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "COPY_ON_WRITE");
options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "userId");
options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "updatedAt");
options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
// Hudi configuration
options.put(HoodieWriteConfig.TBL_NAME.key(), tableName);
options.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator");
// Async services (optional)
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "2");
options.put("hoodie.keep.max.commits", "3");
options.put("hoodie.keep.min.commits", "2");
return options;
}
/**
* Query the Hudi table
*/
public void queryTable() {
Dataset<Row> hudiDf = sparkSession.read()
.format("org.apache.hudi")
.load(basePath + "/" + tableName + "/*");
System.out.println("=== Current Hudi Table Contents ===");
hudiDf.show();
// Show the timeline
Dataset<Row> timelineDf = sparkSession.sql(
"SELECT _hoodie_commit_time, _hoodie_operation FROM hudi_table"
);
System.out.println("=== Hudi Timeline ===");
timelineDf.show();
}
/**
* Read incremental changes
*/
public void readIncrementalChanges(String startTime) {
sparkSession.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), startTime)
.load(basePath + "/" + tableName)
.show();
}
public void close() {
if (sparkSession != null) {
sparkSession.close();
}
}
}

Step 4: Main Application Class

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
public class HudiUpsertApplication {
private static final DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
String basePath = "/tmp/hudi_user_profiles"; // Use S3 path in production: s3a://bucket/path
String tableName = "user_profiles";
HudiTableManager hudiManager = new HudiTableManager(basePath, tableName);
try {
// Step 1: Create initial table
System.out.println("Step 1: Creating initial Hudi table...");
hudiManager.createInitialTable();
hudiManager.queryTable();
// Step 2: Perform upserts - update existing users and add new ones
System.out.println("\nStep 2: Performing upsert operations...");
List<UserProfile> upsertData = Arrays.asList(
// Update existing user
new UserProfile("user1", "John", "Doe-Updated", "[email protected]", 
getCurrentTimestamp(), 6, getCurrentTimestamp()),
// Insert new user
new UserProfile("user4", "Alice", "Brown", "[email protected]", 
getCurrentTimestamp(), 1, getCurrentTimestamp())
);
Dataset<Row> upsertDf = hudiManager.sparkSession.createDataFrame(upsertData, UserProfile.class);
hudiManager.upsertData(upsertDf);
hudiManager.queryTable();
// Step 3: Perform delete operation
System.out.println("\nStep 3: Performing delete operation...");
List<UserProfile> deleteData = Arrays.asList(
new UserProfile("user2", null, null, null, null, 0, getCurrentTimestamp())
);
Dataset<Row> deleteDf = hudiManager.sparkSession.createDataFrame(deleteData, UserProfile.class);
hudiManager.deleteData(deleteDf);
hudiManager.queryTable();
// Step 4: Demonstrate incremental querying
System.out.println("\nStep 4: Reading incremental changes...");
// In real scenario, you would store the commit time from previous operations
hudiManager.readIncrementalChanges("20240101000000000"); // Example start time
} finally {
hudiManager.close();
}
}
private static String getCurrentTimestamp() {
return LocalDateTime.now().format(formatter);
}
}

Step 5: Production Configuration

For production environments, create a configuration class:

public class HudiConfig {
public static Map<String, String> getProductionConfig() {
Map<String, String> config = new HashMap<>();
// AWS S3 Configuration
config.put("hoodie.datasource.write.storage.type", "COPY_ON_WRITE");
config.put("hoodie.datasource.hive_sync.database", "production_db");
config.put("hoodie.datasource.hive_sync.table", "user_profiles");
config.put("hoodie.datasource.hive_sync.enable", "true");
// Performance tuning
config.put("hoodie.parquet.max.file.size", "134217728"); // 128MB
config.put("hoodie.parquet.block.size", "134217728"); // 128MB
config.put("hoodie.parquet.page.size", "1048576"); // 1MB
// Compression
config.put("hoodie.parquet.compression.codec", "gzip");
// Cleaner configuration
config.put("hoodie.cleaner.fileversions.retained", "3");
config.put("hoodie.cleaner.commits.retained", "10");
return config;
}
public static Map<String, String> getAWSS3Config(String bucket, String region) {
Map<String, String> config = new HashMap<>();
config.put("fs.s3a.access.key", "your-access-key");
config.put("fs.s3a.secret.key", "your-secret-key");
config.put("fs.s3a.endpoint", "s3." + region + ".amazonaws.com");
config.put("fs.s3a.region", region);
config.put("hoodie.base.path", "s3a://" + bucket + "/hudi-tables/");
return config;
}
}

Running the Application

  1. Ensure you have Spark installed or run in local mode
  2. Compile and run the application:
   mvn clean compile exec:java -Dexec.mainClass="HudiUpsertApplication"
  1. Observe the output showing:
  • Initial table creation
  • Upsert operations (updates and inserts)
  • Delete operations
  • Table state after each operation

Production Best Practices

1. Table Type Selection

// For read-heavy workloads
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "COPY_ON_WRITE");
// For write-heavy workloads with frequent updates
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ");

2. Partitioning Strategy

// Partition by date for time-series data
options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "createdDate");
options.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
"org.apache.hudi.keygen.SimpleKeyGenerator");
// Multi-level partitioning
options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "region,country");

3. Optimized Upserts with Indexing

// Enable Bloom filter indexing for faster upserts
options.put("hoodie.bloom.index.update.partition.path", "true");
options.put("hoodie.bloom.index.bucketized.checking", "true");
options.put("hoodie.bloom.index.prune.by.ranges", "true");

4. Monitoring and Metrics

// Enable metrics collection
options.put("hoodie.metrics.on", "true");
options.put("hoodie.metrics.reporter.type", "GRAPHITE");
options.put("hoodie.metrics.graphite.host", "localhost");
options.put("hoodie.metrics.graphite.port", "2003");

Use Cases for Hudi Upserts

  1. CDC Pipelines: Replicate database changes to data lakes
  2. GDPR Compliance: Handle right-to-be-forgotten requests
  3. Real-time Analytics: Maintain up-to-date dimension tables
  4. Data Deduplication: Remove duplicates during ingestion
  5. Machine Learning: Update feature stores with latest data

Conclusion

Apache Hudi revolutionizes data lake management by bringing database-like upsert capabilities to massive-scale data storage. With Java and Spark integration, you can:

  • Handle real-time updates and deletes efficiently
  • Maintain ACID compliance in data lake environments
  • Enable incremental processing pipelines
  • Scale to petabytes while supporting frequent updates
  • Integrate with modern query engines and cloud platforms

By implementing Hudi with proper configuration and best practices, you can build robust, scalable data lakes that support both batch and real-time use cases while maintaining data quality and consistency.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper