Overview
Apache Beam is a unified programming model for batch and streaming data processing. It provides a portable API layer for building complex data pipelines that can run on multiple execution engines (runners) like Apache Flink, Spark, Google Dataflow, and others.
Architecture
Beam Programming Model
- Pipeline: The entire data processing workflow
- PCollection: Distributed dataset (bounded/unbounded)
- PTransform: Processing operations (Map, Filter, GroupBy, etc.)
- Runner: Execution engine (Direct, Flink, Spark, Dataflow)
Dependencies
<dependencies> <!-- Beam SDK --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.50.0</version> </dependency> <!-- Runners --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.50.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink-1.16</artifactId> <version>2.50.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.50.0</version> </dependency> <!-- I/O Connectors --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>2.50.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>2.50.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-jdbc</artifactId> <version>2.50.0</version> </dependency> <!-- Extensions --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-json-jackson</artifactId> <version>2.50.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sql</artifactId> <version>2.50.0</version> </dependency> <!-- Testing --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.50.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-test-utils</artifactId> <version>2.50.0</version> <scope>test</scope> </dependency> </dependencies>
Core Implementation
1. Basic Pipeline Setup
public class BeamPipelineBuilder {
public Pipeline createPipeline(PipelineOptions options) {
return Pipeline.create(options);
}
public PipelineOptions createDirectRunnerOptions() {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
options.setJobName("beam-pipeline-" + System.currentTimeMillis());
return options;
}
public PipelineOptions createFlinkRunnerOptions() {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setParallelism(4);
options.setFlinkMaster("[auto]");
return options;
}
public PipelineOptions createDataflowRunnerOptions() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-gcp-project");
options.setRegion("us-central1");
options.setStagingLocation("gs://my-bucket/staging");
options.setTempLocation("gs://my-bucket/temp");
options.setNumWorkers(2);
options.setMaxNumWorkers(10);
options.setWorkerMachineType("n1-standard-2");
return options;
}
}
// Basic ETL Pipeline
public class BasicETLPipeline {
public static void main(String[] args) {
BeamPipelineBuilder builder = new BeamPipelineBuilder();
PipelineOptions options = builder.createDirectRunnerOptions();
Pipeline pipeline = builder.createPipeline(options);
// Read from source
PCollection<String> lines = pipeline
.apply("ReadLines", TextIO.read().from("input.txt"));
// Transform data
PCollection<String> processed = lines
.apply("FilterEmptyLines", Filter.by(line -> !line.trim().isEmpty()))
.apply("ToUpperCase", MapElements.into(TypeDescriptors.strings())
.via(String::toUpperCase));
// Write to sink
processed.apply("WriteOutput",
TextIO.write().to("output").withSuffix(".txt"));
// Execute pipeline
pipeline.run().waitUntilFinish();
}
}
2. Advanced Data Processing Patterns
public class AdvancedBeamPatterns {
// Windowed Processing for Streaming
public static class StreamingWordCount {
public void processStreamingData(Pipeline pipeline) {
PCollection<String> messages = pipeline
.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("word-topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply("ExtractValues", Values.create());
// Apply windowing
PCollection<String> windowedMessages = messages
.apply("WindowMessages", Window.<String>into(
FixedWindows.of(Duration.standardMinutes(1))));
// Count words per window
PCollection<KV<String, Long>> wordCounts = windowedMessages
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\W+"))))
.apply("CountWords", Count.perElement());
// Format results
PCollection<String> results = wordCounts
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via(kv -> kv.getKey() + ": " + kv.getValue()));
results.apply("WriteToFile",
TextIO.write().to("wordcounts").withWindowedWrites().withNumShards(1));
}
}
// Batch Processing with GroupByKey
public static class SalesAnalysis {
public void analyzeSales(Pipeline pipeline, String inputFile) {
PCollection<SaleRecord> sales = pipeline
.apply("ReadSalesData", TextIO.read().from(inputFile))
.apply("ParseSales", ParDo.of(new ParseSaleRecord()));
// Group by product category
PCollection<KV<String, Iterable<SaleRecord>>> salesByCategory = sales
.apply("KeyByCategory", WithKeys.of(SaleRecord::getCategory))
.apply("GroupByCategory", GroupByKey.create());
// Calculate total sales per category
PCollection<KV<String, Double>> categoryTotals = salesByCategory
.apply("CalculateTotals", ParDo.of(new CalculateCategoryTotal()));
// Find top categories
PCollection<List<KV<String, Double>>> topCategories = categoryTotals
.apply("TopN", Top.of(10, new ValueComparator()).withoutDefaults());
topCategories.apply("WriteResults",
TextIO.write().to("sales-analysis").withSuffix(".csv"));
}
private static class ParseSaleRecord extends DoFn<String, SaleRecord> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] parts = c.element().split(",");
if (parts.length >= 4) {
SaleRecord record = new SaleRecord(
parts[0], // productId
parts[1], // category
Double.parseDouble(parts[2]), // amount
Instant.parse(parts[3]) // timestamp
);
c.output(record);
}
}
}
private static class CalculateCategoryTotal
extends DoFn<KV<String, Iterable<SaleRecord>>, KV<String, Double>> {
@ProcessElement
public void processElement(ProcessContext c) {
String category = c.element().getKey();
double total = 0.0;
for (SaleRecord sale : c.element().getValue()) {
total += sale.getAmount();
}
c.output(KV.of(category, total));
}
}
private static class ValueComparator
implements SerializableComparator<KV<String, Double>> {
@Override
public int compare(KV<String, Double> o1, KV<String, Double> o2) {
return Double.compare(o2.getValue(), o1.getValue()); // Descending
}
}
}
}
3. Custom Transforms and Composite Patterns
public class CustomBeamTransforms {
// Composite Transform: Data Validation and Enrichment
public static class ValidateAndEnrich
extends PTransform<PCollection<String>, PCollection<EnrichedRecord>> {
@Override
public PCollection<EnrichedRecord> expand(PCollection<String> input) {
return input
.apply("ParseJSON", ParDo.of(new ParseJson()))
.apply("ValidateData", ParDo.of(new ValidateRecord()))
.apply("EnrichData", ParDo.of(new EnrichRecord()))
.apply("FilterValid", Filter.by(EnrichedRecord::isValid));
}
private static class ParseJson extends DoFn<String, RawRecord> {
@ProcessElement
public void processElement(ProcessContext c) {
try {
ObjectMapper mapper = new ObjectMapper();
RawRecord record = mapper.readValue(c.element(), RawRecord.class);
c.output(record);
} catch (Exception e) {
// Log error but don't fail the pipeline
System.err.println("Failed to parse JSON: " + e.getMessage());
}
}
}
private static class ValidateRecord extends DoFn<RawRecord, RawRecord> {
@ProcessElement
public void processElement(ProcessContext c) {
RawRecord record = c.element();
if (isValidRecord(record)) {
c.output(record);
}
}
private boolean isValidRecord(RawRecord record) {
return record.getId() != null &&
record.getTimestamp() != null &&
record.getValue() >= 0;
}
}
private static class EnrichRecord extends DoFn<RawRecord, EnrichedRecord> {
@ProcessElement
public void processElement(ProcessContext c) {
RawRecord raw = c.element();
EnrichedRecord enriched = new EnrichedRecord(raw);
enriched.setCategory(determineCategory(raw.getValue()));
enriched.setRegion(lookupRegion(raw.getId()));
enriched.setValid(true);
c.output(enriched);
}
private String determineCategory(double value) {
if (value < 100) return "LOW";
if (value < 1000) return "MEDIUM";
return "HIGH";
}
private String lookupRegion(String id) {
// Mock region lookup
return id.startsWith("US") ? "NORTH_AMERICA" : "OTHER";
}
}
}
// Stateful Processing with State and Timer
public static class SessionAnalysis
extends PTransform<PCollection<KV<String, UserEvent>>, PCollection<KV<String, Session>>> {
private final Duration sessionGap;
public SessionAnalysis(Duration sessionGap) {
this.sessionGap = sessionGap;
}
@Override
public PCollection<KV<String, Session>> expand(
PCollection<KV<String, UserEvent>> events) {
return events
.apply("WindowInto Sessions", Window.<KV<String, UserEvent>>into(
Sessions.withGapDuration(sessionGap)))
.apply("GroupByUser", GroupByKey.create())
.apply("CreateSessions", ParDo.of(new CreateSessionsFn()));
}
private static class CreateSessionsFn
extends DoFn<KV<String, Iterable<UserEvent>>, KV<String, Session>> {
@ProcessElement
public void processElement(ProcessContext c) {
String userId = c.element().getKey();
List<UserEvent> events = new ArrayList<>();
c.element().getValue().forEach(events::add);
if (!events.isEmpty()) {
events.sort(Comparator.comparing(UserEvent::getTimestamp));
Session session = new Session(userId, events);
c.output(KV.of(userId, session));
}
}
}
}
// Side Input Pattern for Data Enrichment
public static class EnrichWithReferenceData {
public void enrichWithSideInput(Pipeline pipeline) {
// Main data
PCollection<Transaction> transactions = pipeline
.apply("ReadTransactions",
AvroIO.read(Transaction.class).from("transactions/*.avro"));
// Reference data (small dataset)
PCollection<KV<String, CustomerInfo>> customerInfo = pipeline
.apply("ReadCustomerInfo",
TextIO.read().from("customers.csv"))
.apply("ParseCustomerInfo", ParDo.of(new ParseCustomerInfo()))
.apply("AsView", View.asMap());
// Enrich transactions with customer info
PCollection<EnrichedTransaction> enriched = transactions
.apply("EnrichTransactions", ParDo
.of(new EnrichWithCustomerInfo(customerInfo))
.withSideInputs(customerInfo));
enriched.apply("WriteEnriched",
AvroIO.write(EnrichedTransaction.class).to("enriched-transactions"));
}
private static class EnrichWithCustomerInfo
extends DoFn<Transaction, EnrichedTransaction> {
private final PCollectionView<Map<String, CustomerInfo>> customerView;
public EnrichWithCustomerInfo(
PCollectionView<Map<String, CustomerInfo>> customerView) {
this.customerView = customerView;
}
@ProcessElement
public void processElement(ProcessContext c) {
Transaction transaction = c.element();
Map<String, CustomerInfo> customerMap = c.sideInput(customerView);
CustomerInfo customer = customerMap.get(transaction.getCustomerId());
EnrichedTransaction enriched = new EnrichedTransaction(transaction, customer);
c.output(enriched);
}
}
}
}
Real-World Use Cases
1. Real-time Fraud Detection
public class FraudDetectionPipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
// Read transactions from Kafka
PCollection<Transaction> transactions = pipeline
.apply("ReadTransactions", KafkaIO.<String, Transaction>read()
.withBootstrapServers("kafka-broker:9092")
.withTopic("transactions")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(TransactionDeserializer.class)
.withoutMetadata())
.apply("ExtractTransaction", Values.create())
.apply("WindowTransactions", Window.<Transaction>into(
FixedWindows.of(Duration.standardMinutes(1))));
// Detect suspicious patterns
PCollection<FraudAlert> alerts = transactions
.apply("KeyByAccount", WithKeys.of(Transaction::getAccountId))
.apply("GroupByAccount", GroupByKey.create())
.apply("AnalyzePatterns", ParDo.of(new FraudAnalysisFn()));
// Write alerts to multiple sinks
alerts.apply("WriteToKafka", KafkaIO.<String, FraudAlert>write()
.withBootstrapServers("kafka-broker:9092")
.withTopic("fraud-alerts")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(FraudAlertSerializer.class));
alerts.apply("WriteToBigQuery", BigQueryIO.write()
.to("project:dataset.fraud_alerts")
.withFormatFunction(FraudAlert::toTableRow)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run();
}
private static class FraudAnalysisFn
extends DoFn<KV<String, Iterable<Transaction>>, FraudAlert> {
@ProcessElement
public void processElement(ProcessContext c) {
String accountId = c.element().getKey();
List<Transaction> transactions = new ArrayList<>();
c.element().getValue().forEach(transactions::add);
FraudAnalysis analysis = analyzeTransactions(transactions);
if (analysis.isSuspicious()) {
FraudAlert alert = new FraudAlert(accountId, analysis, Instant.now());
c.output(alert);
}
}
private FraudAnalysis analyzeTransactions(List<Transaction> transactions) {
FraudAnalysis analysis = new FraudAnalysis();
// Check for high frequency
if (transactions.size() > 10) {
analysis.addSuspicion("HIGH_FREQUENCY",
"More than 10 transactions in window");
}
// Check for large amounts
double totalAmount = transactions.stream()
.mapToDouble(Transaction::getAmount)
.sum();
if (totalAmount > 10000) {
analysis.addSuspicion("LARGE_AMOUNT",
"Total amount exceeds $10,000");
}
// Check for geographic anomalies
if (hasGeographicAnomaly(transactions)) {
analysis.addSuspicion("GEOGRAPHIC_ANOMALY",
"Transactions from multiple distant locations");
}
return analysis;
}
private boolean hasGeographicAnomaly(List<Transaction> transactions) {
Set<String> locations = transactions.stream()
.map(Transaction::getLocation)
.collect(Collectors.toSet());
return locations.size() > 3; // Simplified logic
}
}
}
2. Data Warehouse ETL Pipeline
public class DataWarehouseETL {
public void buildETLPipeline(Pipeline pipeline) {
// Extract from multiple sources
PCollection<Customer> customers = pipeline
.apply("ReadCustomers", JdbcIO.<Customer>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/source_db"))
.withQuery("SELECT * FROM customers")
.withRowMapper(new CustomerRowMapper()))
.apply("WindowCustomers", Window.<Customer>into(
FixedWindows.of(Duration.standardHours(1))));
PCollection<Order> orders = pipeline
.apply("ReadOrders", PubsubIO.readStrings()
.fromTopic("projects/project-id/topics/orders"))
.apply("ParseOrders", ParDo.of(new ParseOrderFn()))
.apply("WindowOrders", Window.<Order>into(
FixedWindows.of(Duration.standardHours(1))));
// Transform and join data
PCollection<KV<String, Customer>> customerById = customers
.apply("KeyCustomersById", WithKeys.of(Customer::getId));
PCollection<KV<String, Order>> orderByCustomerId = orders
.apply("KeyOrdersByCustomerId", WithKeys.of(Order::getCustomerId));
PCollection<KV<String, CoGbkResult>> joined = KeyedPCollectionTuple
.of("customers", customerById)
.and("orders", orderByCustomerId)
.apply("JoinData", CoGroupByKey.create());
// Create fact records
PCollection<SalesFact> facts = joined
.apply("CreateFacts", ParDo.of(new CreateSalesFactFn()));
// Load to data warehouse
facts.apply("WriteToBigQuery", BigQueryIO.write()
.to("project:warehouse.sales_facts")
.withFormatFunction(SalesFact::toTableRow)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
// Also write to Cloud Storage for backup
facts.apply("WriteToGCS", TextIO.write()
.to("gs://warehouse-backup/sales-facts")
.withWindowedWrites()
.withNumShards(4));
}
private static class CreateSalesFactFn
extends DoFn<KV<String, CoGbkResult>, SalesFact> {
@ProcessElement
public void processElement(ProcessContext c) {
String customerId = c.element().getKey();
CoGbkResult result = c.element().getValue();
Customer customer = result.getOnly("customers");
Iterable<Order> orders = result.getAll("orders");
for (Order order : orders) {
SalesFact fact = new SalesFact();
fact.setCustomerId(customerId);
fact.setOrderId(order.getId());
fact.setAmount(order.getAmount());
fact.setCustomerSegment(customer.getSegment());
fact.setOrderDate(order.getOrderDate());
fact.setProcessingTime(Instant.now());
c.output(fact);
}
}
}
}
Testing Beam Pipelines
1. Unit Testing with TestPipeline
public class BeamPipelineTest {
@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();
@Test
public void testWordCount() {
// Create test input
PCollection<String> input = testPipeline
.apply(Create.of(
"hello world",
"hello beam",
"beam world"
));
// Apply transformations
PCollection<KV<String, Long>> output = input
.apply(FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split(" "))))
.apply(Count.perElement());
// Verify output
PAssert.that(output)
.containsInAnyOrder(
KV.of("hello", 2L),
KV.of("world", 2L),
KV.of("beam", 2L)
);
testPipeline.run();
}
@Test
public void testWindowedProcessing() {
Instant baseTime = new Instant(0);
PCollection<KV<String, Integer>> input = testPipeline
.apply(Create.timestamped(
TimestampedValue.of(KV.of("key1", 1), baseTime),
TimestampedValue.of(KV.of("key1", 2), baseTime.plus(Duration.standardSeconds(10))),
TimestampedValue.of(KV.of("key2", 3), baseTime.plus(Duration.standardSeconds(20)))
))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))));
PCollection<KV<String, Integer>> sums = input
.apply(Combine.perKey(Sum.ofIntegers()));
PAssert.that(sums)
.containsInAnyOrder(
KV.of("key1", 3),
KV.of("key2", 3)
);
testPipeline.run();
}
@Test
public void testCustomTransform() {
PCollection<String> input = testPipeline
.apply(Create.of(
"{\"id\": \"1\", \"value\": 100}",
"{\"id\": \"2\", \"value\": 200}",
"invalid-json"
));
PCollection<EnrichedRecord> output = input
.apply(new ValidateAndEnrich());
PAssert.that(output)
.satisfies(records -> {
int count = 0;
for (EnrichedRecord record : records) {
assertTrue(record.isValid());
assertNotNull(record.getCategory());
count++;
}
assertEquals(2, count); // Only valid records
return null;
});
testPipeline.run();
}
}
2. Integration Testing
public class BeamIntegrationTest {
@Test
public void testEndToEndPipeline() throws Exception {
// Setup test data
String inputFile = "test-input.txt";
String outputDir = "test-output";
Files.write(Paths.get(inputFile),
Arrays.asList("hello", "world", "beam", "processing"),
StandardCharsets.UTF_8);
// Run pipeline
BasicETLPipeline.main(new String[]{
"--inputFile=" + inputFile,
"--output=" + outputDir,
"--runner=DirectRunner"
});
// Verify results
List<String> outputLines = Files.readAllLines(
Paths.get(outputDir + "-00000-of-00001.txt"),
StandardCharsets.UTF_8);
assertEquals(Arrays.asList("HELLO", "WORLD", "BEAM", "PROCESSING"), outputLines);
// Cleanup
Files.deleteIfExists(Paths.get(inputFile));
deleteDirectory(Paths.get(outputDir));
}
private void deleteDirectory(Path path) throws IOException {
if (Files.exists(path)) {
Files.walk(path)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
}
}
Configuration and Deployment
1. Pipeline Configuration
@Configuration
public class BeamPipelineConfig {
@Value("${beam.runner:direct}")
private String runner;
@Value("${beam.project:}")
private String project;
@Value("${beam.region:us-central1}")
private String region;
@Bean
public PipelineOptions pipelineOptions() {
PipelineOptions options;
switch (runner.toLowerCase()) {
case "dataflow":
DataflowPipelineOptions dataflowOptions =
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject(project);
dataflowOptions.setRegion(region);
dataflowOptions.setStagingLocation("gs://" + project + "-beam/staging");
dataflowOptions.setTempLocation("gs://" + project + "-beam/temp");
dataflowOptions.setNumWorkers(2);
dataflowOptions.setMaxNumWorkers(10);
options = dataflowOptions;
break;
case "flink":
FlinkPipelineOptions flinkOptions =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
flinkOptions.setRunner(FlinkRunner.class);
flinkOptions.setParallelism(4);
options = flinkOptions;
break;
default:
options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
}
return options;
}
}
2. Spring Boot Integration
@SpringBootApplication
public class BeamApplication {
@Autowired
private PipelineOptions pipelineOptions;
@Bean
public Pipeline dataProcessingPipeline() {
Pipeline pipeline = Pipeline.create(pipelineOptions);
// Build your pipeline here
pipeline
.apply("ReadInput", TextIO.read().from("input/*.txt"))
.apply("ProcessData", ParDo.of(new DataProcessor()))
.apply("WriteOutput", TextIO.write().to("output/result"));
return pipeline;
}
@EventListener(ApplicationReadyEvent.class)
public void runPipeline() {
Pipeline pipeline = dataProcessingPipeline();
PipelineResult result = pipeline.run();
if (pipelineOptions.getRunner() == DirectRunner.class) {
result.waitUntilFinish();
}
}
public static void main(String[] args) {
SpringApplication.run(BeamApplication.class, args);
}
}
Best Practices
- Choose Appropriate Runner: Select runner based on data size and latency requirements
- Optimize Sharding: Use appropriate number of shards for output
- Handle Failures Gracefully: Implement proper error handling in DoFns
- Monitor Pipeline Metrics: Use built-in metrics for monitoring
- Test Thoroughly: Use TestPipeline for unit testing
- Use Composite Transforms: For better modularity and reusability
- Optimize Memory Usage: Be careful with large side inputs and state
- Profile Performance: Use runner-specific profiling tools
This implementation provides a comprehensive foundation for building data processing pipelines with Apache Beam Java SDK, covering batch and streaming processing, custom transforms, testing, and deployment strategies.