Traditional data lakes have excelled at handling massive volumes of append-only data but struggled with updates and deletes—operations critical for modern use cases like GDPR compliance, change data capture (CDC), and real-time analytics. Apache Hudi (Hadoop Upserts Deletes and Incrementals) solves this problem by bringing database-like upsert capabilities to data lakes while maintaining scalability and efficiency.
What is Apache Hudi?
Apache Hudi is an open-source data management framework that enables transactional capabilities (ACID compliance) on data lakes. It provides:
- Upsert Support: Insert new records or update existing ones
- Incremental Processing: Process only changed data
- ACID Transactions: Ensure data consistency
- Schema Evolution: Handle evolving data schemas
- Multiple Query Engines: Integration with Spark, Flink, Presto, Trino, and Hive
Core Hudi Concepts
- Table Types:
- Copy on Write (CoW): Updates create new file versions (better for read-heavy workloads)
- Merge on Read (MoR): Updates go to delta logs merged on read (better for write-heavy workloads)
- Query Types:
- Snapshot Queries: Latest committed data
- Incremental Queries: Only changes since last commit
- Read Optimized Queries: Latest committed data without merge (MoR only)
- Timeline: Hudi maintains a timeline of all table operations for transactional consistency.
Hudi Architecture for Upserts
[Data Source] → [Ingestion Job] → [Hudi Table] → [Query Engine] | | | | Kafka Spark/Flink Parquet + Spark/Presto/ JDBC Java App .hoodie files Hive/Trino
Hudi stores data in Parquet files (base data) and Avro log files (updates in MoR), with metadata in .hoodie directory for transaction management.
Hands-On Tutorial: Implementing Hudi Upserts with Java and Spark
Let's build a complete example showing how to perform upserts on a data lake using Hudi, Java, and Apache Spark.
Step 1: Project Setup
Maven Dependencies (pom.xml):
<properties>
<spark.version>3.4.0</spark.version>
<hudi.version>0.13.1</hudi.version>
</properties>
<dependencies>
<!-- Apache Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Apache Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3.4-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<!-- AWS SDK for S3 (if using cloud storage) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.20.0</version>
</dependency>
</dependencies>
Step 2: Data Model
Let's model a user profile table that needs frequent updates:
public class UserProfile {
private String userId;
private String firstName;
private String lastName;
private String email;
private String lastLogin;
private int loginCount;
private String updatedAt;
// Constructors
public UserProfile() {}
public UserProfile(String userId, String firstName, String lastName,
String email, String lastLogin, int loginCount, String updatedAt) {
this.userId = userId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.lastLogin = lastLogin;
this.loginCount = loginCount;
this.updatedAt = updatedAt;
}
// Getters and Setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastLogin = lastName; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getLastLogin() { return lastLogin; }
public void setLastLogin(String lastLogin) { this.lastLogin = lastLogin; }
public int getLoginCount() { return loginCount; }
public void setLoginCount(int loginCount) { this.loginCount = loginCount; }
public String getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(String updatedAt) { this.updatedAt = updatedAt; }
}
Step 3: Hudi Table Manager
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.HashMap;
import java.util.Map;
public class HudiTableManager {
private final SparkSession sparkSession;
private final String basePath;
private final String tableName;
public HudiTableManager(String basePath, String tableName) {
this.basePath = basePath;
this.tableName = tableName;
this.sparkSession = SparkSession.builder()
.appName("HudiUpsertExample")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate();
}
/**
* Create initial Hudi table with some sample data
*/
public void createInitialTable() {
// Sample initial data
String[][] initialData = {
{"user1", "John", "Doe", "[email protected]", "2024-01-15 10:30:00", "5", "2024-01-15 10:30:00"},
{"user2", "Jane", "Smith", "[email protected]", "2024-01-15 11:45:00", "3", "2024-01-15 11:45:00"},
{"user3", "Bob", "Johnson", "[email protected]", "2024-01-15 09:15:00", "8", "2024-01-15 09:15:00"}
};
// Create DataFrame from sample data
Dataset<Row> df = sparkSession.createDataFrame(
java.util.Arrays.asList(
new UserProfile(initialData[0][0], initialData[0][1], initialData[0][2],
initialData[0][3], initialData[0][4], Integer.parseInt(initialData[0][5]), initialData[0][6]),
new UserProfile(initialData[1][0], initialData[1][1], initialData[1][2],
initialData[1][3], initialData[1][4], Integer.parseInt(initialData[1][5]), initialData[1][6]),
new UserProfile(initialData[2][0], initialData[2][1], initialData[2][2],
initialData[2][3], initialData[2][4], Integer.parseInt(initialData[2][5]), initialData[2][6])
),
UserProfile.class
);
// Write initial data as Hudi table
writeToHudiTable(df, "insert");
System.out.println("Initial Hudi table created successfully!");
}
/**
* Perform upsert operations on Hudi table
*/
public void upsertData(Dataset<Row> upsertDf) {
writeToHudiTable(upsertDf, "upsert");
System.out.println("Upsert operation completed successfully!");
}
/**
* Perform delete operations on Hudi table
*/
public void deleteData(Dataset<Row> deleteDf) {
Map<String, String> options = getBaseHudiOptions();
options.put(DataSourceWriteOptions.OPERATION().key(), "delete");
deleteDf.write()
.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath + "/" + tableName);
System.out.println("Delete operation completed successfully!");
}
/**
* Common method to write data to Hudi table
*/
private void writeToHudiTable(Dataset<Row> df, String operation) {
Map<String, String> options = getBaseHudiOptions();
options.put(DataSourceWriteOptions.OPERATION().key(), operation);
df.write()
.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath + "/" + tableName);
}
/**
* Base Hudi configuration options
*/
private Map<String, String> getBaseHudiOptions() {
Map<String, String> options = new HashMap<>();
// Table configuration
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "COPY_ON_WRITE");
options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "userId");
options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "updatedAt");
options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
// Hudi configuration
options.put(HoodieWriteConfig.TBL_NAME.key(), tableName);
options.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
"org.apache.hudi.keygen.NonpartitionedKeyGenerator");
// Async services (optional)
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "2");
options.put("hoodie.keep.max.commits", "3");
options.put("hoodie.keep.min.commits", "2");
return options;
}
/**
* Query the Hudi table
*/
public void queryTable() {
Dataset<Row> hudiDf = sparkSession.read()
.format("org.apache.hudi")
.load(basePath + "/" + tableName + "/*");
System.out.println("=== Current Hudi Table Contents ===");
hudiDf.show();
// Show the timeline
Dataset<Row> timelineDf = sparkSession.sql(
"SELECT _hoodie_commit_time, _hoodie_operation FROM hudi_table"
);
System.out.println("=== Hudi Timeline ===");
timelineDf.show();
}
/**
* Read incremental changes
*/
public void readIncrementalChanges(String startTime) {
sparkSession.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), startTime)
.load(basePath + "/" + tableName)
.show();
}
public void close() {
if (sparkSession != null) {
sparkSession.close();
}
}
}
Step 4: Main Application Class
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
public class HudiUpsertApplication {
private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
String basePath = "/tmp/hudi_user_profiles"; // Use S3 path in production: s3a://bucket/path
String tableName = "user_profiles";
HudiTableManager hudiManager = new HudiTableManager(basePath, tableName);
try {
// Step 1: Create initial table
System.out.println("Step 1: Creating initial Hudi table...");
hudiManager.createInitialTable();
hudiManager.queryTable();
// Step 2: Perform upserts - update existing users and add new ones
System.out.println("\nStep 2: Performing upsert operations...");
List<UserProfile> upsertData = Arrays.asList(
// Update existing user
new UserProfile("user1", "John", "Doe-Updated", "[email protected]",
getCurrentTimestamp(), 6, getCurrentTimestamp()),
// Insert new user
new UserProfile("user4", "Alice", "Brown", "[email protected]",
getCurrentTimestamp(), 1, getCurrentTimestamp())
);
Dataset<Row> upsertDf = hudiManager.sparkSession.createDataFrame(upsertData, UserProfile.class);
hudiManager.upsertData(upsertDf);
hudiManager.queryTable();
// Step 3: Perform delete operation
System.out.println("\nStep 3: Performing delete operation...");
List<UserProfile> deleteData = Arrays.asList(
new UserProfile("user2", null, null, null, null, 0, getCurrentTimestamp())
);
Dataset<Row> deleteDf = hudiManager.sparkSession.createDataFrame(deleteData, UserProfile.class);
hudiManager.deleteData(deleteDf);
hudiManager.queryTable();
// Step 4: Demonstrate incremental querying
System.out.println("\nStep 4: Reading incremental changes...");
// In real scenario, you would store the commit time from previous operations
hudiManager.readIncrementalChanges("20240101000000000"); // Example start time
} finally {
hudiManager.close();
}
}
private static String getCurrentTimestamp() {
return LocalDateTime.now().format(formatter);
}
}
Step 5: Production Configuration
For production environments, create a configuration class:
public class HudiConfig {
public static Map<String, String> getProductionConfig() {
Map<String, String> config = new HashMap<>();
// AWS S3 Configuration
config.put("hoodie.datasource.write.storage.type", "COPY_ON_WRITE");
config.put("hoodie.datasource.hive_sync.database", "production_db");
config.put("hoodie.datasource.hive_sync.table", "user_profiles");
config.put("hoodie.datasource.hive_sync.enable", "true");
// Performance tuning
config.put("hoodie.parquet.max.file.size", "134217728"); // 128MB
config.put("hoodie.parquet.block.size", "134217728"); // 128MB
config.put("hoodie.parquet.page.size", "1048576"); // 1MB
// Compression
config.put("hoodie.parquet.compression.codec", "gzip");
// Cleaner configuration
config.put("hoodie.cleaner.fileversions.retained", "3");
config.put("hoodie.cleaner.commits.retained", "10");
return config;
}
public static Map<String, String> getAWSS3Config(String bucket, String region) {
Map<String, String> config = new HashMap<>();
config.put("fs.s3a.access.key", "your-access-key");
config.put("fs.s3a.secret.key", "your-secret-key");
config.put("fs.s3a.endpoint", "s3." + region + ".amazonaws.com");
config.put("fs.s3a.region", region);
config.put("hoodie.base.path", "s3a://" + bucket + "/hudi-tables/");
return config;
}
}
Running the Application
- Ensure you have Spark installed or run in local mode
- Compile and run the application:
mvn clean compile exec:java -Dexec.mainClass="HudiUpsertApplication"
- Observe the output showing:
- Initial table creation
- Upsert operations (updates and inserts)
- Delete operations
- Table state after each operation
Production Best Practices
1. Table Type Selection
// For read-heavy workloads options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "COPY_ON_WRITE"); // For write-heavy workloads with frequent updates options.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ");
2. Partitioning Strategy
// Partition by date for time-series data options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "createdDate"); options.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator"); // Multi-level partitioning options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "region,country");
3. Optimized Upserts with Indexing
// Enable Bloom filter indexing for faster upserts
options.put("hoodie.bloom.index.update.partition.path", "true");
options.put("hoodie.bloom.index.bucketized.checking", "true");
options.put("hoodie.bloom.index.prune.by.ranges", "true");
4. Monitoring and Metrics
// Enable metrics collection
options.put("hoodie.metrics.on", "true");
options.put("hoodie.metrics.reporter.type", "GRAPHITE");
options.put("hoodie.metrics.graphite.host", "localhost");
options.put("hoodie.metrics.graphite.port", "2003");
Use Cases for Hudi Upserts
- CDC Pipelines: Replicate database changes to data lakes
- GDPR Compliance: Handle right-to-be-forgotten requests
- Real-time Analytics: Maintain up-to-date dimension tables
- Data Deduplication: Remove duplicates during ingestion
- Machine Learning: Update feature stores with latest data
Conclusion
Apache Hudi revolutionizes data lake management by bringing database-like upsert capabilities to massive-scale data storage. With Java and Spark integration, you can:
- Handle real-time updates and deletes efficiently
- Maintain ACID compliance in data lake environments
- Enable incremental processing pipelines
- Scale to petabytes while supporting frequent updates
- Integrate with modern query engines and cloud platforms
By implementing Hudi with proper configuration and best practices, you can build robust, scalable data lakes that support both batch and real-time use cases while maintaining data quality and consistency.