Introduction
Apache Iceberg is an open table format for huge analytic datasets that brings reliability and simplicity to data lakes. It provides ACID transactions, schema evolution, hidden partitioning, and time travel capabilities.
Architecture Overview
Iceberg Table Structure
public class IcebergArchitecture {
/**
* Iceberg Table Components:
* 1. Metadata Layer (Metadata files, Manifest lists, Manifests)
* 2. Data Layer (Data files - Parquet, ORC, Avro)
* 3. Catalog Layer (Hive, Glue, JDBC, Nessie)
*/
private TableCatalog catalog;
private TableOperations operations;
private Table table;
public void understandStructure() {
// Metadata files track table state
// Manifest lists point to manifest files
// Manifest files track data files
// Data files contain actual data
}
}
Core Dependencies
Maven Configuration
<properties>
<iceberg.version>1.3.0</iceberg.version>
<parquet.version>1.13.1</parquet.version>
</properties>
<dependencies>
<!-- Iceberg Core -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Iceberg API -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Runtime -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Parquet Format -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- AWS Support -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Spark Integration -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.4_2.12</artifactId>
<version>${iceberg.version}</version>
</dependency>
</dependencies>
Table Creation and Management
Schema Definition
public class IcebergSchemaManager {
public Schema createUserSchema() {
return new Schema(
Types.NestedField.required(1, "user_id", Types.LongType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "email", Types.StringType.get()),
Types.NestedField.required(4, "age", Types.IntegerType.get()),
Types.NestedField.optional(5, "address", Types.StringType.get()),
Types.NestedField.required(6, "created_at", Types.TimestampType.withZone()),
Types.NestedField.required(7, "updated_at", Types.TimestampType.withZone()),
Types.NestedField.optional(8, "metadata", Types.StringType.get())
);
}
public Schema createSalesSchema() {
return new Schema(
Types.NestedField.required(1, "sale_id", Types.LongType.get()),
Types.NestedField.required(2, "user_id", Types.LongType.get()),
Types.NestedField.required(3, "product_id", Types.LongType.get()),
Types.NestedField.required(4, "quantity", Types.IntegerType.get()),
Types.NestedField.required(5, "amount", Types.DoubleType.get()),
Types.NestedField.required(6, "sale_date", Types.DateType.get()),
Types.NestedField.required(7, "sale_timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(8, "region", Types.StringType.get()),
Types.NestedField.optional(9, "category", Types.StringType.get())
);
}
public PartitionSpec createPartitionSpec(Schema schema) {
return PartitionSpec.builderFor(schema)
.identity("sale_date") // Daily partitioning
.bucket("region", 16) // Bucket by region
.build();
}
public SortOrder createSortOrder(Schema schema) {
return SortOrder.builderFor(schema)
.asc("sale_date")
.asc("region")
.desc("amount")
.build();
}
}
Table Creation Service
@Service
public class IcebergTableService {
private final TableCatalog catalog;
private final FileIO fileIO;
public IcebergTableService(TableCatalog catalog, FileIO fileIO) {
this.catalog = catalog;
this.fileIO = fileIO;
}
public Table createTable(String tableName, Schema schema,
PartitionSpec partitionSpec,
Map<String, String> properties) {
TableIdentifier tableIdentifier = TableIdentifier.of("database", tableName);
// Build table properties
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("format-version", "2");
tableProperties.put("write.parquet.compression-codec", "zstd");
tableProperties.put("write.metadata.delete-after-commit.enabled", "true");
tableProperties.putAll(properties);
return catalog.buildTable(tableIdentifier, schema)
.withPartitionSpec(partitionSpec)
.withProperties(tableProperties)
.create();
}
public Table createTimeSeriesTable(String tableName) {
IcebergSchemaManager schemaManager = new IcebergSchemaManager();
Schema schema = schemaManager.createSalesSchema();
PartitionSpec partitionSpec = schemaManager.createPartitionSpec(schema);
Map<String, String> properties = Map.of(
"write.target-file-size-bytes", "536870912", // 512MB
"write.parquet.row-group-size-bytes", "134217728", // 128MB
"write.parquet.page-size-bytes", "1048576", // 1MB
"write.parquet.dict-size-bytes", "2097152" // 2MB
);
return createTable(tableName, schema, partitionSpec, properties);
}
public void alterTableAddColumn(String tableName, String columnName,
Type type, String doc) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.addColumn(columnName, type, doc)
.commit();
}
public void alterTableDropColumn(String tableName, String columnName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.deleteColumn(columnName)
.commit();
}
public void renameColumn(String tableName, String oldName, String newName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.renameColumn(oldName, newName)
.commit();
}
}
Catalog Configuration
Various Catalog Implementations
@Configuration
public class IcebergCatalogConfig {
@Bean
public TableCatalog hiveCatalog() {
return new HiveCatalog();
}
@Bean
public TableCatalog hadoopCatalog(Configuration hadoopConfig) {
return new HadoopCatalog(hadoopConfig, "s3a://my-bucket/warehouse/");
}
@Bean
public TableCatalog restCatalog() {
return RESTCatalog.builder()
.uri("http://iceberg-rest:8181")
.build();
}
@Bean
public TableCatalog glueCatalog() {
return new GlueCatalog();
}
@Bean
public TableCatalog jdbcCatalog(DataSource dataSource) {
JdbcCatalog catalog = new JdbcCatalog();
catalog.initialize("jdbc_catalog",
Map.of(
"warehouse", "s3a://my-bucket/warehouse/",
"uri", "jdbc:postgresql://localhost:5432/iceberg_catalog"
));
return catalog;
}
@Bean
public FileIO s3FileIO() {
return new S3FileIO();
}
@Bean
public Configuration hadoopConfig() {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "your-access-key");
conf.set("fs.s3a.secret.key", "your-secret-key");
conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
return conf;
}
}
// Custom catalog configuration
@Component
public class CatalogManager {
private final Map<String, TableCatalog> catalogs = new ConcurrentHashMap<>();
public void registerCatalog(String name, TableCatalog catalog) {
catalogs.put(name, catalog);
}
public Table getTable(String catalogName, String database, String tableName) {
TableCatalog catalog = catalogs.get(catalogName);
if (catalog == null) {
throw new IllegalArgumentException("Catalog not found: " + catalogName);
}
return catalog.loadTable(TableIdentifier.of(database, tableName));
}
public List<String> listTables(String catalogName, String database) {
TableCatalog catalog = catalogs.get(catalogName);
if (catalog == null) {
throw new IllegalArgumentException("Catalog not found: " + catalogName);
}
return catalog.listTables(Namespace.of(database));
}
}
Data Writing Operations
Batch Data Writing
@Service
public class IcebergDataWriter {
private final TableCatalog catalog;
private final FileIO fileIO;
public IcebergDataWriter(TableCatalog catalog, FileIO fileIO) {
this.catalog = catalog;
this.fileIO = fileIO;
}
public void writeBatchData(String tableName, List<Record> records) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
try (FileAppender<Record> appender = Parquet.write(fileIO.newOutputFile("path/to/data.parquet"))
.schema(table.schema())
.build()) {
for (Record record : records) {
appender.add(record);
}
} catch (IOException e) {
throw new RuntimeException("Failed to write data", e);
}
}
public void insertData(String tableName, List<GenericRecord> records) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
try {
// Create a transaction
Transaction transaction = table.newTransaction();
// Create appender for data files
List<DataFile> dataFiles = new ArrayList<>();
for (GenericRecord record : records) {
DataFile dataFile = writeRecordToFile(table, record);
dataFiles.add(dataFile);
}
// Append data files to table
transaction.newAppend()
.appendFiles(dataFiles)
.commit();
// Commit transaction
transaction.commitTransaction();
} catch (Exception e) {
throw new RuntimeException("Failed to insert data", e);
}
}
private DataFile writeRecordToFile(Table table, GenericRecord record) {
// Implementation to write record to data file
// Returns DataFile metadata
return null; // Simplified
}
public void upsertData(String tableName, List<GenericRecord> records,
List<String> equalityFields) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
try {
// Row-level delta updates
table.newRowDelta()
.addDeletes(writePositionDeletes(table, records))
.addRows(writeDataFiles(table, records))
.commit();
} catch (Exception e) {
throw new RuntimeException("Failed to upsert data", e);
}
}
public void mergeInto(String tableName, List<GenericRecord> newData,
String mergeCondition) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// MERGE operation implementation
// This would typically use Spark or Flink for execution
}
}
Streaming Data Ingestion
@Component
public class IcebergStreamWriter {
private final TableCatalog catalog;
private final Map<String, Table> tableCache = new ConcurrentHashMap<>();
private final ExecutorService writeExecutor;
public IcebergStreamWriter(TableCatalog catalog) {
this.catalog = catalog;
this.writeExecutor = Executors.newFixedThreadPool(4);
}
public void streamInsert(String tableName, GenericRecord record) {
Table table = getTable(tableName);
writeExecutor.submit(() -> {
try {
// Use row-level writer for streaming
try (RowDataWriter writer = new RowDataWriter(table)) {
writer.write(record);
writer.commit();
}
} catch (Exception e) {
System.err.println("Failed to write record: " + e.getMessage());
// Implement retry logic
}
});
}
public void batchStreamInsert(String tableName, List<GenericRecord> records) {
if (records.isEmpty()) return;
Table table = getTable(tableName);
writeExecutor.submit(() -> {
try {
// Batch write for better performance
List<DataFile> dataFiles = new ArrayList<>();
int batchSize = 1000;
for (int i = 0; i < records.size(); i += batchSize) {
List<GenericRecord> batch = records.subList(i,
Math.min(i + batchSize, records.size()));
DataFile dataFile = writeBatchToFile(table, batch);
dataFiles.add(dataFile);
}
// Commit all data files
table.newAppend()
.appendFiles(dataFiles)
.commit();
} catch (Exception e) {
System.err.println("Failed to write batch: " + e.getMessage());
// Implement retry and dead letter queue
}
});
}
private Table getTable(String tableName) {
return tableCache.computeIfAbsent(tableName, name ->
catalog.loadTable(TableIdentifier.of("database", name)));
}
private DataFile writeBatchToFile(Table table, List<GenericRecord> records) {
// Implementation to write batch to data file
return null; // Simplified
}
@PreDestroy
public void shutdown() {
writeExecutor.shutdown();
try {
if (!writeExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
writeExecutor.shutdownNow();
}
} catch (InterruptedException e) {
writeExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Row-level writer for streaming
class RowDataWriter implements AutoCloseable {
private final Table table;
private final List<DataFile> dataFiles = new ArrayList<>();
public RowDataWriter(Table table) {
this.table = table;
}
public void write(GenericRecord record) {
// Accumulate records and create data files in batches
}
public void commit() {
if (!dataFiles.isEmpty()) {
table.newAppend()
.appendFiles(dataFiles)
.commit();
dataFiles.clear();
}
}
@Override
public void close() {
commit();
}
}
Data Reading Operations
Table Scanning and Querying
@Service
public class IcebergDataReader {
private final TableCatalog catalog;
public IcebergDataReader(TableCatalog catalog) {
this.catalog = catalog;
}
public List<GenericRecord> scanTable(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
List<GenericRecord> results = new ArrayList<>();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (FileScanTask task : tasks) {
try (CloseableIterator<GenericRecord> records =
openFile(task.file(), table.schema())) {
while (records.hasNext()) {
results.add(records.next());
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to scan table", e);
}
return results;
}
public List<GenericRecord> queryWithFilters(String tableName,
Expression filter) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
List<GenericRecord> results = new ArrayList<>();
try (CloseableIterable<FileScanTask> tasks =
table.newScan().filter(filter).planFiles()) {
for (FileScanTask task : tasks) {
try (CloseableIterator<GenericRecord> records =
openFile(task.file(), table.schema())) {
while (records.hasNext()) {
results.add(records.next());
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to query table", e);
}
return results;
}
public List<GenericRecord> timeTravelQuery(String tableName,
long timestampMillis) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Query table as it was at specific timestamp
Table timeTravelTable = table.newSnapshot()
.asOfTime(timestampMillis)
.table();
return scanTable(timeTravelTable);
}
public List<GenericRecord> snapshotQuery(String tableName, long snapshotId) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Query specific snapshot
Table snapshotTable = table.newSnapshot()
.useSnapshot(snapshotId)
.table();
return scanTable(snapshotTable);
}
private CloseableIterator<GenericRecord> openFile(DataFile file, Schema schema) {
// Implementation to open and read data file
return null; // Simplified
}
private List<GenericRecord> scanTable(Table table) {
// Implementation to scan any table
return Collections.emptyList(); // Simplified
}
public TableStatistics getTableStatistics(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
return TableStatistics.builder()
.rowCount(table.currentSnapshot().summary().get("total-records"))
.fileCount(table.currentSnapshot().allManifests().size())
.sizeInBytes(table.currentSnapshot().summary().get("total-data-files"))
.build();
}
}
Advanced Query Patterns
@Component
public class AdvancedIcebergQueries {
private final TableCatalog catalog;
public AdvancedIcebergQueries(TableCatalog catalog) {
this.catalog = catalog;
}
public List<GenericRecord> incrementalQuery(String tableName,
long startSnapshotId,
long endSnapshotId) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Get data added between two snapshots
try (CloseableIterable<FileScanTask> tasks =
table.newScan()
.appendsBetween(startSnapshotId, endSnapshotId)
.planFiles()) {
return readTasks(tasks, table.schema());
} catch (IOException e) {
throw new RuntimeException("Failed to execute incremental query", e);
}
}
public Map<String, Object> getTableMetadata(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
Snapshot currentSnapshot = table.currentSnapshot();
Map<String, Object> metadata = new HashMap<>();
metadata.put("tableName", tableName);
metadata.put("schema", table.schema().toString());
metadata.put("partitionSpec", table.spec().toString());
metadata.put("currentSnapshotId", currentSnapshot.snapshotId());
metadata.put("snapshotTimestamp", currentSnapshot.timestampMillis());
metadata.put("totalDataFiles", currentSnapshot.summary().get("total-data-files"));
metadata.put("totalRecords", currentSnapshot.summary().get("total-records"));
metadata.put("totalDataSize", currentSnapshot.summary().get("total-data-size"));
return metadata;
}
public List<Map<String, Object>> listSnapshots(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
return StreamSupport.stream(table.snapshots().spliterator(), false)
.map(snapshot -> {
Map<String, Object> snapshotInfo = new HashMap<>();
snapshotInfo.put("snapshotId", snapshot.snapshotId());
snapshotInfo.put("timestamp", snapshot.timestampMillis());
snapshotInfo.put("operation", snapshot.operation());
snapshotInfo.put("summary", snapshot.summary());
return snapshotInfo;
})
.collect(Collectors.toList());
}
public void rollbackToSnapshot(String tableName, long snapshotId) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.manageSnapshots()
.rollbackTo(snapshotId)
.commit();
}
public void expireSnapshots(String tableName, long olderThanTimestamp) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.expireSnapshots()
.expireOlderThan(olderThanTimestamp)
.commit();
}
private List<GenericRecord> readTasks(CloseableIterable<FileScanTask> tasks, Schema schema) {
List<GenericRecord> results = new ArrayList<>();
try {
for (FileScanTask task : tasks) {
try (CloseableIterator<GenericRecord> records = openFile(task.file(), schema)) {
while (records.hasNext()) {
results.add(records.next());
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to read tasks", e);
}
return results;
}
}
Schema Evolution
Schema Management Service
@Service
public class SchemaEvolutionService {
private final TableCatalog catalog;
public SchemaEvolutionService(TableCatalog catalog) {
this.catalog = catalog;
}
public void evolveSchemaAddColumn(String tableName, String columnName,
Type type, String doc) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.addColumn(columnName, type, doc)
.commit();
}
public void evolveSchemaRenameColumn(String tableName, String oldName,
String newName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.renameColumn(oldName, newName)
.commit();
}
public void evolveSchemaUpdateColumn(String tableName, String columnName,
Type newType) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.updateColumn(columnName, newType)
.commit();
}
public void evolveSchemaDeleteColumn(String tableName, String columnName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSchema()
.deleteColumn(columnName)
.commit();
}
public void evolveSchemaReorderColumns(String tableName, List<String> columnOrder) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
UpdateSchema updateSchema = table.updateSchema();
for (int i = 0; i < columnOrder.size(); i++) {
updateSchema.moveFirst(columnOrder.get(i));
}
updateSchema.commit();
}
public Schema getCurrentSchema(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
return table.schema();
}
public List<Schema> getSchemaHistory(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
List<Schema> schemas = new ArrayList<>();
// Iceberg tracks schema evolution in metadata
// This would require parsing metadata files to get historical schemas
return schemas;
}
}
Partition Management
Partition Operations
@Service
public class PartitionManagementService {
private final TableCatalog catalog;
public PartitionManagementService(TableCatalog catalog) {
this.catalog = catalog;
}
public void addPartitionField(String tableName, String sourceColumn,
String transform) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSpec()
.addField(sourceColumn, transform)
.commit();
}
public void removePartitionField(String tableName, String partitionField) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
table.updateSpec()
.removeField(partitionField)
.commit();
}
public List<PartitionField> getPartitionFields(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
return table.spec().fields();
}
public Map<String, String> getPartitionStats(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
Snapshot snapshot = table.currentSnapshot();
Map<String, String> stats = new HashMap<>();
if (snapshot != null) {
stats.put("partitionCount", String.valueOf(snapshot.allManifests().size()));
// Add more partition-specific statistics
}
return stats;
}
}
Maintenance Operations
Table Maintenance Service
@Service
public class TableMaintenanceService {
private final TableCatalog catalog;
private final FileIO fileIO;
public TableMaintenanceService(TableCatalog catalog, FileIO fileIO) {
this.catalog = catalog;
this.fileIO = fileIO;
}
public void compactTable(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Rewrite data files for better performance
table.rewriteFiles()
.rewriteFiles(findSmallFiles(table), combineIntoLargeFiles(table))
.commit();
}
public void expireSnapshots(String tableName, int keepLast) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Keep only the last N snapshots
List<Long> snapshotIds = StreamSupport.stream(table.snapshots().spliterator(), false)
.map(Snapshot::snapshotId)
.sorted(Comparator.reverseOrder())
.limit(keepLast)
.collect(Collectors.toList());
long expireBefore = snapshotIds.isEmpty() ?
System.currentTimeMillis() :
getSnapshotTimestamp(table, snapshotIds.get(snapshotIds.size() - 1));
table.expireSnapshots()
.expireOlderThan(expireBefore)
.commit();
}
public void removeOrphanFiles(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Remove files that are no longer referenced by the table
table.deleteOrphanFiles()
.commit();
}
public void optimizeTable(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
// Full table optimization
compactTable(tableName);
expireSnapshots(tableName, 10);
removeOrphanFiles(tableName);
// Update table statistics
table.refresh();
}
private Set<DataFile> findSmallFiles(Table table) {
// Implementation to find small files that should be compacted
return Collections.emptySet(); // Simplified
}
private Set<DataFile> combineIntoLargeFiles(Table table) {
// Implementation to create combined large files
return Collections.emptySet(); // Simplified
}
private long getSnapshotTimestamp(Table table, long snapshotId) {
return StreamSupport.stream(table.snapshots().spliterator(), false)
.filter(snapshot -> snapshot.snapshotId() == snapshotId)
.findFirst()
.map(Snapshot::timestampMillis)
.orElse(System.currentTimeMillis());
}
}
Integration with Spark
Spark Iceberg Integration
@Service
public class SparkIcebergService {
public void readWithSpark(String tableName) {
/*
// Spark code example
SparkSession spark = SparkSession.builder()
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate();
Dataset<Row> df = spark.read()
.format("iceberg")
.load("database." + tableName);
df.show();
*/
}
public void writeWithSpark(String tableName, Dataset<Row> dataset) {
/*
dataset.write()
.format("iceberg")
.mode("append")
.save("database." + tableName);
*/
}
public void sqlQueryWithSpark(String sql) {
/*
SparkSession spark = SparkSession.builder().getOrCreate();
Dataset<Row> result = spark.sql(sql);
result.show();
*/
}
}
Monitoring and Metrics
Iceberg Metrics Collector
@Component
public class IcebergMetricsCollector {
private final TableCatalog catalog;
private final MeterRegistry meterRegistry;
public IcebergMetricsCollector(TableCatalog catalog, MeterRegistry meterRegistry) {
this.catalog = catalog;
this.meterRegistry = meterRegistry;
}
public void collectTableMetrics(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Map<String, String> summary = snapshot.summary();
// Record metrics
Gauge.builder("iceberg.table.data_files")
.tag("table", tableName)
.register(meterRegistry, () ->
Long.parseLong(summary.getOrDefault("total-data-files", "0")));
Gauge.builder("iceberg.table.records")
.tag("table", tableName)
.register(meterRegistry, () ->
Long.parseLong(summary.getOrDefault("total-records", "0")));
Gauge.builder("iceberg.table.size_bytes")
.tag("table", tableName)
.register(meterRegistry, () ->
Long.parseLong(summary.getOrDefault("total-data-size", "0")));
}
}
public Map<String, Object> getDetailedTableMetrics(String tableName) {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
Snapshot snapshot = table.currentSnapshot();
Map<String, Object> metrics = new HashMap<>();
if (snapshot != null) {
metrics.putAll(snapshot.summary());
metrics.put("snapshotId", snapshot.snapshotId());
metrics.put("timestamp", snapshot.timestampMillis());
metrics.put("operation", snapshot.operation());
}
return metrics;
}
}
Error Handling and Resilience
Iceberg Exception Handling
@Service
public class IcebergOperationService {
private final TableCatalog catalog;
private final RetryTemplate retryTemplate;
public IcebergOperationService(TableCatalog catalog) {
this.catalog = catalog;
this.retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000L); // 1 second
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
}
public void safeTableOperation(String tableName, TableOperation operation) {
retryTemplate.execute(context -> {
try {
Table table = catalog.loadTable(TableIdentifier.of("database", tableName));
return operation.execute(table);
} catch (Exception e) {
if (shouldRetry(e)) {
throw e; // Retryable exception
} else {
return null; // Non-retryable exception
}
}
});
}
private boolean shouldRetry(Exception e) {
// Determine if exception is retryable
return e instanceof CommitFailedException ||
e instanceof org.apache.iceberg.exceptions.RuntimeException;
}
@FunctionalInterface
public interface TableOperation {
Object execute(Table table);
}
}
// Custom exceptions
class IcebergOperationException extends RuntimeException {
public IcebergOperationException(String message) {
super(message);
}
public IcebergOperationException(String message, Throwable cause) {
super(message, cause);
}
}
Testing
Iceberg Test Utilities
@SpringBootTest
@TestPropertySource(properties = {
"iceberg.catalog.type=memory",
"iceberg.warehouse.location=target/test-warehouse"
})
public class IcebergIntegrationTest {
@Autowired
private TableCatalog catalog;
@Autowired
private IcebergTableService tableService;
@Autowired
private IcebergDataWriter dataWriter;
@Test
void testTableCreation() {
String tableName = "test_users";
// Create table
Table table = tableService.createTimeSeriesTable(tableName);
assertNotNull(table);
// Verify table exists
Table loadedTable = catalog.loadTable(TableIdentifier.of("database", tableName));
assertNotNull(loadedTable);
assertEquals(tableName, loadedTable.name());
}
@Test
void testDataInsertAndRead() {
String tableName = "test_sales";
Table table = tableService.createTimeSeriesTable(tableName);
// Create test data
List<GenericRecord> records = createTestRecords();
// Insert data
dataWriter.insertData(tableName, records);
// Verify data can be read
IcebergDataReader reader = new IcebergDataReader(catalog);
List<GenericRecord> readRecords = reader.scanTable(tableName);
assertEquals(records.size(), readRecords.size());
}
@Test
void testTimeTravel() {
String tableName = "test_time_travel";
Table table = tableService.createTimeSeriesTable(tableName);
// Insert initial data
List<GenericRecord> initialData = createInitialRecords();
dataWriter.insertData(tableName, initialData);
long snapshot1 = table.currentSnapshot().snapshotId();
// Insert more data
List<GenericRecord> additionalData = createAdditionalRecords();
dataWriter.insertData(tableName, additionalData);
// Time travel to first snapshot
IcebergDataReader reader = new IcebergDataReader(catalog);
List<GenericRecord> timeTravelData = reader.snapshotQuery(tableName, snapshot1);
assertEquals(initialData.size(), timeTravelData.size());
}
private List<GenericRecord> createTestRecords() {
// Implementation to create test GenericRecord objects
return Collections.emptyList(); // Simplified
}
private List<GenericRecord> createInitialRecords() {
return Collections.emptyList(); // Simplified
}
private List<GenericRecord> createAdditionalRecords() {
return Collections.emptyList(); // Simplified
}
}
Conclusion
This comprehensive Iceberg implementation provides:
- Table Management - Creation, schema evolution, partitioning
- Data Operations - Read, write, update, delete with ACID guarantees
- Time Travel - Historical querying and snapshot management
- Catalog Integration - Multiple catalog implementations
- Maintenance Operations - Compaction, snapshot expiration, orphan file cleanup
- Monitoring - Comprehensive metrics and monitoring
- Error Handling - Resilient operations with retry mechanisms
Iceberg brings relational database-like capabilities to data lakes, enabling reliable, performant, and manageable large-scale data processing in Java applications.