Dataflow Pipeline in Java: Complete Implementation Guide

Learn how to build robust, scalable data processing pipelines in Java with modern streaming and batch processing capabilities.

Table of Contents

  1. Pipeline Architecture
  2. Core Pipeline Components
  3. Stream Processing
  4. Batch Processing
  5. Windowed Operations
  6. State Management
  7. Fault Tolerance
  8. Monitoring & Metrics
  9. Spring Boot Integration

Pipeline Architecture

Dataflow Pipeline Patterns

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Source    │───▶│   Transform │───▶│   Window    │───▶│    Sink     │
│             │    │             │    │    & Agg    │    │             │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
│                    │                    │
▼                    ▼                    ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Filter    │    │   State     │    │   Kafka,    │
│   & Map     │    │ Management  │    │   DB, S3    │
└─────────────┘    └─────────────┘    └─────────────┘

Core Pipeline Components

1. Pipeline Foundation Classes

// Core pipeline interfaces
public interface Pipeline<T> {
String getName();
Pipeline<T> from(Source<T> source);
<R> Pipeline<R> transform(Transform<T, R> transform);
Pipeline<T> filter(Filter<T> filter);
Pipeline<T> window(WindowStrategy<T> windowStrategy);
void to(Sink<T> sink);
void start();
void stop();
PipelineStats getStats();
}
public interface Source<T> {
String getName();
void start(SourceContext<T> context);
void stop();
SourceStats getStats();
}
public interface SourceContext<T> {
void emit(T element);
void markProcessed(String elementId);
void markFailed(String elementId, Throwable error);
}
public interface Transform<T, R> {
String getName();
R apply(T input);
void onError(T input, Throwable error);
}
public interface Sink<T> {
String getName();
void write(T element);
void writeBatch(List<T> elements);
void flush();
SinkStats getStats();
}
// Base pipeline implementation
@Component
@Slf4j
public class DataflowPipeline<T> implements Pipeline<T> {
private final String name;
private final ExecutorService executor;
private final PipelineMetrics metrics;
private Source<T> source;
private final List<Transform<?, ?>> transforms;
private Sink<T> sink;
private WindowStrategy<T> windowStrategy;
private volatile boolean running = false;
public DataflowPipeline(String name, PipelineMetrics metrics) {
this.name = name;
this.metrics = metrics;
this.transforms = new CopyOnWriteArrayList<>();
this.executor = Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
.setNameFormat("pipeline-" + name + "-%d")
.build());
}
@Override
public Pipeline<T> from(Source<T> source) {
this.source = source;
return this;
}
@Override
@SuppressWarnings("unchecked")
public <R> Pipeline<R> transform(Transform<T, R> transform) {
this.transforms.add(transform);
// Return a new pipeline with the transformed type
DataflowPipeline<R> newPipeline = new DataflowPipeline<>(name + "-transformed", metrics);
newPipeline.source = new TransformedSource<>(this, transform);
return newPipeline;
}
@Override
public Pipeline<T> filter(Filter<T> filter) {
this.transforms.add(new FilterTransform<>(filter));
return this;
}
@Override
public Pipeline<T> window(WindowStrategy<T> windowStrategy) {
this.windowStrategy = windowStrategy;
return this;
}
@Override
public void to(Sink<T> sink) {
this.sink = sink;
}
@Override
public void start() {
if (running) {
log.warn("Pipeline {} is already running", name);
return;
}
running = true;
executor.submit(() -> {
log.info("Starting pipeline: {}", name);
metrics.recordPipelineStart(name);
source.start(new SourceContext<T>() {
@Override
public void emit(T element) {
processElement(element);
}
@Override
public void markProcessed(String elementId) {
metrics.recordElementProcessed(name, elementId);
}
@Override
public void markFailed(String elementId, Throwable error) {
metrics.recordElementFailed(name, elementId, error);
}
});
log.info("Pipeline {} started successfully", name);
});
}
@Override
public void stop() {
running = false;
source.stop();
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
metrics.recordPipelineStop(name);
log.info("Pipeline {} stopped", name);
}
@Override
public PipelineStats getStats() {
return new PipelineStats(
name,
metrics.getProcessedCount(name),
metrics.getFailedCount(name),
metrics.getProcessingRate(name),
source.getStats(),
sink != null ? sink.getStats() : null
);
}
@SuppressWarnings("unchecked")
private void processElement(T element) {
if (!running) return;
try {
Object current = element;
// Apply all transforms
for (Transform transform : transforms) {
current = transform.apply(current);
if (current == null) {
// Element was filtered out
return;
}
}
// Apply windowing if configured
if (windowStrategy != null) {
windowStrategy.process((T) current);
} else if (sink != null) {
// Write to sink
sink.write((T) current);
}
metrics.recordElementProcessed(name, element.toString());
} catch (Exception e) {
log.error("Error processing element in pipeline {}: {}", name, element, e);
metrics.recordElementFailed(name, element.toString(), e);
}
}
// Statistics classes
public record PipelineStats(
String pipelineName,
long processedCount,
long failedCount,
double processingRate,
SourceStats sourceStats,
SinkStats sinkStats
) {}
public record SourceStats(
String sourceName,
long elementsEmitted,
long errors,
double emissionRate
) {}
public record SinkStats(
String sinkName,
long elementsWritten,
long errors,
double writeRate
) {}
}

2. Source Implementations

// Kafka Source
@Component
@Slf4j
public class KafkaSource<T> implements Source<T> {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Class<T> type;
private final String topic;
private final String groupId;
private volatile boolean running = false;
private KafkaConsumer<String, String> consumer;
private Thread consumerThread;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public KafkaSource(KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
Class<T> type,
String topic,
String groupId) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.type = type;
this.topic = topic;
this.groupId = groupId;
}
@Override
public String getName() {
return "kafka-source-" + topic;
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("Kafka source for topic {} is already running", topic);
return;
}
running = true;
consumerThread = new Thread(() -> {
log.info("Starting Kafka source for topic: {}", topic);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
T element = objectMapper.readValue(record.value(), type);
context.emit(element);
elementsEmitted.incrementAndGet();
// Manual commit after successful processing
consumer.commitSync();
} catch (Exception e) {
log.error("Failed to process Kafka record from topic {}: {}", topic, record.value(), e);
errors.incrementAndGet();
context.markFailed(record.key(), e);
}
}
} catch (Exception e) {
log.error("Error in Kafka consumer for topic {}", topic, e);
errors.incrementAndGet();
}
}
consumer.close();
log.info("Kafka source for topic {} stopped", topic);
}, "kafka-source-" + topic);
consumerThread.start();
}
@Override
public void stop() {
running = false;
if (consumerThread != null) {
try {
consumerThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0) // Simple rate calculation
);
}
}
// File Source
@Component
@Slf4j
public class FileSource<T> implements Source<T> {
private final String filePath;
private final ObjectMapper objectMapper;
private final Class<T> type;
private final boolean watchForChanges;
private volatile boolean running = false;
private Thread fileReaderThread;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public FileSource(String filePath, ObjectMapper objectMapper, Class<T> type, boolean watchForChanges) {
this.filePath = filePath;
this.objectMapper = objectMapper;
this.type = type;
this.watchForChanges = watchForChanges;
}
@Override
public String getName() {
return "file-source-" + filePath;
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("File source for {} is already running", filePath);
return;
}
running = true;
fileReaderThread = new Thread(() -> {
log.info("Starting file source for: {}", filePath);
try {
Path path = Paths.get(filePath);
if (watchForChanges) {
watchFileChanges(path, context);
} else {
processExistingFile(path, context);
}
} catch (Exception e) {
log.error("Error in file source for {}", filePath, e);
}
log.info("File source for {} stopped", filePath);
}, "file-source-" + filePath);
fileReaderThread.start();
}
private void processExistingFile(Path path, SourceContext<T> context) throws IOException {
if (!Files.exists(path)) {
log.warn("File does not exist: {}", path);
return;
}
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
try {
T element = objectMapper.readValue(line, type);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to parse line from file {}: {}", filePath, line, e);
errors.incrementAndGet();
context.markFailed(line, e);
}
});
}
}
private void watchFileChanges(Path path, SourceContext<T> context) throws IOException {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path directory = path.getParent();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
long lastModified = 0;
while (running) {
try {
WatchKey key = watchService.poll(1, TimeUnit.SECONDS);
if (key != null) {
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
Path changedFile = (Path) event.context();
if (changedFile.equals(path.getFileName())) {
long currentModified = Files.getLastModifiedTime(path).toMillis();
if (currentModified > lastModified) {
processNewLines(path, lastModified, context);
lastModified = currentModified;
}
}
}
}
key.reset();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error watching file {}", path, e);
}
}
watchService.close();
}
private void processNewLines(Path path, long since, SourceContext<T> context) throws IOException {
// Implementation to read only new lines since last modification
// This is a simplified version
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
try {
T element = objectMapper.readValue(line, type);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to parse line from file {}: {}", filePath, line, e);
errors.incrementAndGet();
}
});
}
}
@Override
public void stop() {
running = false;
if (fileReaderThread != null) {
fileReaderThread.interrupt();
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0)
);
}
}
// Database Source (JDBC)
@Component
@Slf4j
public class JdbcSource<T> implements Source<T> {
private final DataSource dataSource;
private final String query;
private final RowMapper<T> rowMapper;
private final long pollIntervalMs;
private volatile boolean running = false;
private ScheduledExecutorService scheduler;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public JdbcSource(DataSource dataSource, String query, RowMapper<T> rowMapper, long pollIntervalMs) {
this.dataSource = dataSource;
this.query = query;
this.rowMapper = rowMapper;
this.pollIntervalMs = pollIntervalMs;
}
@Override
public String getName() {
return "jdbc-source-" + query.hashCode();
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("JDBC source is already running");
return;
}
running = true;
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(query);
ResultSet rs = stmt.executeQuery()) {
while (rs.next() && running) {
try {
T element = rowMapper.mapRow(rs);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to map row from JDBC source", e);
errors.incrementAndGet();
context.markFailed("row-" + elementsEmitted.get(), e);
}
}
} catch (Exception e) {
log.error("Error in JDBC source", e);
errors.incrementAndGet();
}
}, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
log.info("JDBC source started with query: {}", query);
}
@Override
public void stop() {
running = false;
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0)
);
}
public interface RowMapper<T> {
T mapRow(ResultSet rs) throws SQLException;
}
}

Stream Processing

3. Transform Operations

// Core transform implementations
@Component
@Slf4j
public class MapTransform<T, R> implements Transform<T, R> {
private final Function<T, R> mapper;
private final String name;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public MapTransform(String name, Function<T, R> mapper) {
this.name = name;
this.mapper = mapper;
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
R result = mapper.apply(input);
processedCount.incrementAndGet();
return result;
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Mapping failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in map transform {} for input: {}", name, input, error);
}
public long getProcessedCount() {
return processedCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
}
@Component
@Slf4j
public class FilterTransform<T> implements Transform<T, T> {
private final Predicate<T> predicate;
private final String name;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong filteredCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public FilterTransform(String name, Predicate<T> predicate) {
this.name = name;
this.predicate = predicate;
}
@Override
public String getName() {
return name;
}
@Override
public T apply(T input) {
try {
processedCount.incrementAndGet();
if (predicate.test(input)) {
return input;
} else {
filteredCount.incrementAndGet();
return null; // Filtered out
}
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Filter failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in filter transform {} for input: {}", name, input, error);
}
public long getProcessedCount() {
return processedCount.get();
}
public long getFilteredCount() {
return filteredCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
}
// Complex transform: Keyed operations
@Component
@Slf4j
public class KeyedTransform<T, K, R> implements Transform<T, R> {
private final Function<T, K> keyExtractor;
private final Function<List<T>, R> aggregator;
private final String name;
private final int windowSize;
private final Map<K, List<T>> keyedBuffer;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public KeyedTransform(String name, Function<T, K> keyExtractor, 
Function<List<T>, R> aggregator, int windowSize) {
this.name = name;
this.keyExtractor = keyExtractor;
this.aggregator = aggregator;
this.windowSize = windowSize;
this.keyedBuffer = new ConcurrentHashMap<>();
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
K key = keyExtractor.apply(input);
List<T> buffer = keyedBuffer.computeIfAbsent(key, k -> new ArrayList<>());
synchronized (buffer) {
buffer.add(input);
if (buffer.size() >= windowSize) {
R result = aggregator.apply(new ArrayList<>(buffer));
buffer.clear();
processedCount.incrementAndGet();
return result;
}
}
processedCount.incrementAndGet();
return null; // Not enough elements for aggregation yet
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Keyed transform failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in keyed transform {} for input: {}", name, input, error);
}
// Flush remaining elements
public List<R> flush() {
List<R> results = new ArrayList<>();
for (List<T> buffer : keyedBuffer.values()) {
synchronized (buffer) {
if (!buffer.isEmpty()) {
results.add(aggregator.apply(new ArrayList<>(buffer)));
buffer.clear();
}
}
}
return results;
}
}
// Enrichment transform
@Component
@Slf4j
public class EnrichmentTransform<T, R> implements Transform<T, R> {
private final Function<T, R> enricher;
private final String name;
private final Cache<String, R> cache;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong cacheHits = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public EnrichmentTransform(String name, Function<T, R> enricher, 
long cacheSize, Duration cacheTtl) {
this.name = name;
this.enricher = enricher;
this.cache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(cacheTtl)
.build();
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
String cacheKey = generateCacheKey(input);
R result = cache.getIfPresent(cacheKey);
if (result == null) {
result = enricher.apply(input);
cache.put(cacheKey, result);
} else {
cacheHits.incrementAndGet();
}
processedCount.incrementAndGet();
return result;
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Enrichment failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in enrichment transform {} for input: {}", name, input, error);
}
private String generateCacheKey(T input) {
return Objects.hash(input).toString();
}
public double getCacheHitRate() {
return processedCount.get() > 0 ? (double) cacheHits.get() / processedCount.get() : 0.0;
}
}

Batch Processing

4. Batch Pipeline Components

// Batch pipeline for processing large datasets
@Component
@Slf4j
public class BatchPipeline<T> {
private final String name;
private final ExecutorService executor;
private final PipelineMetrics metrics;
private final int batchSize;
private final Duration flushInterval;
private Source<T> source;
private final List<Transform<T, ?>> transforms;
private Sink<T> sink;
private volatile boolean running = false;
private final List<T> batchBuffer;
public BatchPipeline(String name, PipelineMetrics metrics, int batchSize, Duration flushInterval) {
this.name = name;
this.metrics = metrics;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.transforms = new CopyOnWriteArrayList<>();
this.batchBuffer = Collections.synchronizedList(new ArrayList<>());
this.executor = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder()
.setNameFormat("batch-pipeline-" + name + "-%d")
.build());
}
public BatchPipeline<T> from(Source<T> source) {
this.source = source;
return this;
}
@SuppressWarnings("unchecked")
public <R> BatchPipeline<R> transform(Transform<T, R> transform) {
this.transforms.add(transform);
// In batch processing, we typically maintain the same pipeline
return (BatchPipeline<R>) this;
}
public BatchPipeline<T> to(Sink<T> sink) {
this.sink = sink;
return this;
}
public void start() {
if (running) {
log.warn("Batch pipeline {} is already running", name);
return;
}
running = true;
// Start batch flusher
executor.submit(this::batchFlusher);
// Start source
source.start(new SourceContext<T>() {
@Override
public void emit(T element) {
addToBatch(element);
}
@Override
public void markProcessed(String elementId) {
metrics.recordElementProcessed(name, elementId);
}
@Override
public void markFailed(String elementId, Throwable error) {
metrics.recordElementFailed(name, elementId, error);
}
});
log.info("Batch pipeline {} started with batch size: {}, flush interval: {}", 
name, batchSize, flushInterval);
}
public void stop() {
running = false;
source.stop();
// Flush remaining elements
flushBatch();
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
metrics.recordPipelineStop(name);
log.info("Batch pipeline {} stopped", name);
}
private void addToBatch(T element) {
synchronized (batchBuffer) {
batchBuffer.add(element);
if (batchBuffer.size() >= batchSize) {
flushBatch();
}
}
}
private void batchFlusher() {
while (running) {
try {
Thread.sleep(flushInterval.toMillis());
flushBatch();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error in batch flusher for pipeline {}", name, e);
}
}
}
@SuppressWarnings("unchecked")
private void flushBatch() {
List<T> currentBatch;
synchronized (batchBuffer) {
if (batchBuffer.isEmpty()) {
return;
}
currentBatch = new ArrayList<>(batchBuffer);
batchBuffer.clear();
}
if (!currentBatch.isEmpty()) {
executor.submit(() -> processBatch(currentBatch));
}
}
private void processBatch(List<T> batch) {
try {
List<T> currentBatch = batch;
// Apply transforms
for (Transform transform : transforms) {
List<Object> transformedBatch = new ArrayList<>();
for (T element : currentBatch) {
try {
Object transformed = transform.apply(element);
if (transformed != null) {
transformedBatch.add(transformed);
}
} catch (Exception e) {
log.error("Error applying transform {} to element", transform.getName(), e);
metrics.recordElementFailed(name, element.toString(), e);
}
}
currentBatch = (List<T>) transformedBatch;
}
// Write to sink
if (sink != null && !currentBatch.isEmpty()) {
sink.writeBatch(currentBatch);
metrics.recordBatchProcessed(name, currentBatch.size());
}
} catch (Exception e) {
log.error("Error processing batch in pipeline {}", name, e);
metrics.recordBatchFailed(name, batch.size(), e);
}
}
}
// Batch window for time-based batching
@Component
@Slf4j
public class TimeWindow<T> implements WindowStrategy<T> {
private final Duration windowSize;
private final Duration windowSlide;
private final List<T> windowBuffer;
private final List<WindowProcessor<T>> processors;
private ScheduledExecutorService windowScheduler;
private volatile boolean running = false;
public TimeWindow(Duration windowSize, Duration windowSlide) {
this.windowSize = windowSize;
this.windowSlide = windowSlide;
this.windowBuffer = Collections.synchronizedList(new ArrayList<>());
this.processors = new CopyOnWriteArrayList<>();
}
@Override
public String getName() {
return "time-window-" + windowSize.toMillis() + "ms";
}
@Override
public void process(T element) {
if (running) {
windowBuffer.add(element);
}
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
public void start() {
if (running) {
log.warn("Time window is already running");
return;
}
running = true;
windowScheduler = Executors.newSingleThreadScheduledExecutor();
// Schedule window processing
windowScheduler.scheduleAtFixedRate(this::processWindow, 
windowSlide.toMillis(), windowSlide.toMillis(), TimeUnit.MILLISECONDS);
log.info("Time window started with size: {}, slide: {}", windowSize, windowSlide);
}
public void stop() {
running = false;
if (windowScheduler != null) {
windowScheduler.shutdown();
try {
if (!windowScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
windowScheduler.shutdownNow();
}
} catch (InterruptedException e) {
windowScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Process remaining elements
processWindow();
log.info("Time window stopped");
}
private void processWindow() {
List<T> currentWindow;
synchronized (windowBuffer) {
currentWindow = new ArrayList<>(windowBuffer);
// For sliding windows, we might keep some elements
// For tumbling windows, we clear the buffer
if (windowSize.equals(windowSlide)) {
windowBuffer.clear();
} else {
// Keep elements from the last windowSize period
long cutoffTime = System.currentTimeMillis() - windowSize.toMillis();
// This is simplified - in real implementation, you'd need timestamps
windowBuffer.removeIf(element -> shouldRemoveFromWindow(element, cutoffTime));
}
}
if (!currentWindow.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(currentWindow);
} catch (Exception e) {
log.error("Error in window processor {}", processor.getName(), e);
}
}
}
}
private boolean shouldRemoveFromWindow(T element, long cutoffTime) {
// Implementation depends on how you track element timestamps
// This is a placeholder implementation
return true; // Remove all elements for simplicity
}
public interface WindowProcessor<T> {
String getName();
void process(List<T> window);
}
}

Windowed Operations

5. Advanced Windowing Strategies

// Window manager for different window types
@Component
@Slf4j
public class WindowManager<T> {
private final Map<String, WindowStrategy<T>> windows;
private final PipelineMetrics metrics;
public WindowManager(PipelineMetrics metrics) {
this.windows = new ConcurrentHashMap<>();
this.metrics = metrics;
}
public void registerWindow(String name, WindowStrategy<T> window) {
windows.put(name, window);
log.info("Registered window: {}", name);
}
public void processElement(String windowName, T element) {
WindowStrategy<T> window = windows.get(windowName);
if (window != null) {
window.process(element);
metrics.recordWindowElementProcessed(windowName);
} else {
log.warn("Window not found: {}", windowName);
}
}
public void startAll() {
windows.values().forEach(window -> {
try {
if (window instanceof ManageableWindow) {
((ManageableWindow) window).start();
}
} catch (Exception e) {
log.error("Failed to start window: {}", window.getName(), e);
}
});
}
public void stopAll() {
windows.values().forEach(window -> {
try {
if (window instanceof ManageableWindow) {
((ManageableWindow) window).stop();
}
} catch (Exception e) {
log.error("Failed to stop window: {}", window.getName(), e);
}
});
}
public interface ManageableWindow {
void start();
void stop();
}
}
// Session window for grouping events by session
@Component
@Slf4j
public class SessionWindow<T> implements WindowStrategy<T>, WindowManager.ManageableWindow {
private final Function<T, String> sessionIdExtractor;
private final Function<T, Long> timestampExtractor;
private final Duration sessionTimeout;
private final Duration windowSlide;
private final Map<String, Session<T>> activeSessions;
private final List<WindowProcessor<T>> processors;
private ScheduledExecutorSession cleanupScheduler;
private volatile boolean running = false;
public SessionWindow(Function<T, String> sessionIdExtractor,
Function<T, Long> timestampExtractor,
Duration sessionTimeout,
Duration windowSlide) {
this.sessionIdExtractor = sessionIdExtractor;
this.timestampExtractor = timestampExtractor;
this.sessionTimeout = sessionTimeout;
this.windowSlide = windowSlide;
this.activeSessions = new ConcurrentHashMap<>();
this.processors = new CopyOnWriteArrayList<>();
}
@Override
public String getName() {
return "session-window-" + sessionTimeout.toMillis() + "ms";
}
@Override
public void process(T element) {
if (!running) return;
String sessionId = sessionIdExtractor.apply(element);
long timestamp = timestampExtractor.apply(element);
Session<T> session = activeSessions.computeIfAbsent(sessionId, 
id -> new Session<>(id, timestamp));
session.addElement(element, timestamp);
session.updateLastActivity(timestamp);
}
@Override
public void start() {
if (running) {
log.warn("Session window is already running");
return;
}
running = true;
cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
// Schedule session cleanup
cleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredSessions,
windowSlide.toMillis(), windowSlide.toMillis(), TimeUnit.MILLISECONDS);
log.info("Session window started with timeout: {}", sessionTimeout);
}
@Override
public void stop() {
running = false;
if (cleanupScheduler != null) {
cleanupScheduler.shutdown();
try {
if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
cleanupScheduler.shutdownNow();
}
} catch (InterruptedException e) {
cleanupScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Process all remaining sessions
closeAllSessions();
log.info("Session window stopped");
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
private void cleanupExpiredSessions() {
long currentTime = System.currentTimeMillis();
long timeoutMillis = sessionTimeout.toMillis();
Iterator<Map.Entry<String, Session<T>>> iterator = activeSessions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Session<T>> entry = iterator.next();
Session<T> session = entry.getValue();
if (currentTime - session.getLastActivity() > timeoutMillis) {
// Session expired, process and remove
processSession(session);
iterator.remove();
}
}
}
private void closeAllSessions() {
for (Session<T> session : activeSessions.values()) {
processSession(session);
}
activeSessions.clear();
}
private void processSession(Session<T> session) {
List<T> sessionElements = session.getElements();
if (!sessionElements.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(sessionElements);
} catch (Exception e) {
log.error("Error processing session {} with processor {}", 
session.getId(), processor.getName(), e);
}
}
}
}
private static class Session<T> {
private final String id;
private final List<T> elements;
private long lastActivity;
private final long startTime;
public Session(String id, long startTime) {
this.id = id;
this.elements = new ArrayList<>();
this.startTime = startTime;
this.lastActivity = startTime;
}
public void addElement(T element, long timestamp) {
elements.add(element);
}
public void updateLastActivity(long timestamp) {
this.lastActivity = timestamp;
}
public String getId() { return id; }
public List<T> getElements() { return elements; }
public long getLastActivity() { return lastActivity; }
public long getStartTime() { return startTime; }
}
}
// Global window for processing all elements as one group
@Component
@Slf4j
public class GlobalWindow<T> implements WindowStrategy<T>, WindowManager.ManageableWindow {
private final List<T> allElements;
private final List<WindowProcessor<T>> processors;
private final AtomicBoolean processed;
public GlobalWindow() {
this.allElements = Collections.synchronizedList(new ArrayList<>());
this.processors = new CopyOnWriteArrayList<>();
this.processed = new AtomicBoolean(false);
}
@Override
public String getName() {
return "global-window";
}
@Override
public void process(T element) {
if (!processed.get()) {
allElements.add(element);
}
}
@Override
public void start() {
processed.set(false);
log.info("Global window started");
}
@Override
public void stop() {
if (processed.compareAndSet(false, true)) {
processAllElements();
}
log.info("Global window stopped");
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
private void processAllElements() {
List<T> elements = new ArrayList<>(allElements);
if (!elements.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(elements);
} catch (Exception e) {
log.error("Error in global window processor {}", processor.getName(), e);
}
}
}
allElements.clear();
}
public void triggerProcessing() {
stop();
start();
}
}

State Management

6. Stateful Processing

// State management for pipeline operations
@Component
@Slf4j
public class PipelineStateManager {
private final StateStore stateStore;
private final Map<String, PipelineState> pipelineStates;
public PipelineStateManager(StateStore stateStore) {
this.stateStore = stateStore;
this.pipelineStates = new ConcurrentHashMap<>();
}
public <T> PipelineState<T> getOrCreateState(String pipelineName, String stateName, 
Class<T> stateType, T initialState) {
String fullStateName = pipelineName + ":" + stateName;
@SuppressWarnings("unchecked")
PipelineState<T> state = (PipelineState<T>) pipelineStates.get(fullStateName);
if (state == null) {
state = new PipelineState<>(fullStateName, stateType, initialState, stateStore);
pipelineStates.put(fullStateName, state);
}
return state;
}
public void saveAllStates() {
pipelineStates.values().forEach(PipelineState::save);
}
public void restoreAllStates() {
pipelineStates.values().forEach(PipelineState::restore);
}
public void clearState(String pipelineName, String stateName) {
String fullStateName = pipelineName + ":" + stateName;
PipelineState<?> state = pipelineStates.remove(fullStateName);
if (state != null) {
state.clear();
}
}
}
// Pipeline state with persistence
@Component
@Slf4j
public class PipelineState<T> {
private final String name;
private final Class<T> type;
private final StateStore stateStore;
private T currentState;
private final T initialState;
public PipelineState(String name, Class<T> type, T initialState, StateStore stateStore) {
this.name = name;
this.type = type;
this.initialState = initialState;
this.stateStore = stateStore;
this.currentState = initialState;
restore();
}
public T get() {
return currentState;
}
public void update(Function<T, T> updater) {
synchronized (this) {
T newState = updater.apply(currentState);
if (newState != null) {
currentState = newState;
}
}
}
public void set(T newState) {
synchronized (this) {
currentState = newState;
}
}
public void save() {
try {
stateStore.save(name, currentState);
log.debug("Saved state for: {}", name);
} catch (Exception e) {
log.error("Failed to save state for: {}", name, e);
}
}
public void restore() {
try {
T savedState = stateStore.load(name, type);
if (savedState != null) {
currentState = savedState;
log.debug("Restored state for: {}", name);
}
} catch (Exception e) {
log.warn("Failed to restore state for: {}, using initial state", name, e);
currentState = initialState;
}
}
public void clear() {
synchronized (this) {
currentState = initialState;
}
stateStore.delete(name);
}
public String getName() {
return name;
}
}
// State store interface with multiple implementations
public interface StateStore {
<T> void save(String key, T value);
<T> T load(String key, Class<T> type);
void delete(String key);
boolean exists(String key);
}
// In-memory state store
@Component
@Slf4j
public class InMemoryStateStore implements StateStore {
private final Map<String, Object> storage;
private final ObjectMapper objectMapper;
public InMemoryStateStore(ObjectMapper objectMapper) {
this.storage = new ConcurrentHashMap<>();
this.objectMapper = objectMapper;
}
@Override
public <T> void save(String key, T value) {
storage.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <T> T load(String key, Class<T> type) {
Object value = storage.get(key);
if (value != null && type.isInstance(value)) {
return (T) value;
}
return null;
}
@Override
public void delete(String key) {
storage.remove(key);
}
@Override
public boolean exists(String key) {
return storage.containsKey(key);
}
}
// File-based state store
@Component
@Slf4j
public class FileStateStore implements StateStore {
private final Path storageDir;
private final ObjectMapper objectMapper;
public FileStateStore(@Value("${app.state.store.dir:/tmp/pipeline-state}") String storageDir,
ObjectMapper objectMapper) throws IOException {
this.storageDir = Paths.get(storageDir);
this.objectMapper = objectMapper;
if (!Files.exists(this.storageDir)) {
Files.createDirectories(this.storageDir);
}
}
@Override
public <T> void save(String key, T value) {
try {
Path filePath = storageDir.resolve(key + ".json");
String json = objectMapper.writeValueAsString(value);
Files.write(filePath, json.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception e) {
throw new StateStoreException("Failed to save state for key: " + key, e);
}
}
@Override
public <T> T load(String key, Class<T> type) {
try {
Path filePath = storageDir.resolve(key + ".json");
if (!Files.exists(filePath)) {
return null;
}
String json = new String(Files.readAllBytes(filePath));
return objectMapper.readValue(json, type);
} catch (Exception e) {
throw new StateStoreException("Failed to load state for key: " + key, e);
}
}
@Override
public void delete(String key) {
try {
Path filePath = storageDir.resolve(key + ".json");
Files.deleteIfExists(filePath);
} catch (Exception e) {
throw new StateStoreException("Failed to delete state for key: " + key, e);
}
}
@Override
public boolean exists(String key) {
Path filePath = storageDir.resolve(key + ".json");
return Files.exists(filePath);
}
}
// Stateful transform example
@Component
@Slf4j
public class StatefulAggregationTransform<T, K, R> implements Transform<T, R> {
private final Function<T, K> keyExtractor;
private final Function<T, Double> valueExtractor;
private final AggregationFunction aggregationFunction;
private final PipelineStateManager stateManager;
private final String pipelineName;
private final String transformName;
public StatefulAggregationTransform(String pipelineName, String transformName,
Function<T, K> keyExtractor,
Function<T, Double> valueExtractor,
AggregationFunction aggregationFunction,
PipelineStateManager stateManager) {
this.pipelineName = pipelineName;
this.transformName = transformName;
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
this.aggregationFunction = aggregationFunction;
this.stateManager = stateManager;
}
@Override
public String getName() {
return transformName;
}
@Override
@SuppressWarnings("unchecked")
public R apply(T input) {
K key = keyExtractor.apply(input);
Double value = valueExtractor.apply(input);
String stateName = "aggregation-" + key.toString();
PipelineState<AggregationState> state = stateManager.getOrCreateState(
pipelineName, stateName, AggregationState.class, new AggregationState());
AggregationState currentState = state.get();
AggregationState newState = aggregationFunction.aggregate(currentState, value);
state.set(newState);
return (R) new AggregationResult(key, newState, System.currentTimeMillis());
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in stateful aggregation transform for input: {}", input, error);
}
public static class AggregationState {
private double sum = 0.0;
private long count = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
// Getters and setters
public double getSum() { return sum; }
public void setSum(double sum) { this.sum = sum; }
public long getCount() { return count; }
public void setCount(long count) { this.count = count; }
public double getMin() { return min; }
public void setMin(double min) { this.min = min; }
public double getMax() { return max; }
public void setMax(double max) { this.max = max; }
public double getAverage() {
return count > 0 ? sum / count : 0.0;
}
}
public interface AggregationFunction {
AggregationState aggregate(AggregationState current, Double value);
}
public static class SumAggregation implements AggregationFunction {
@Override
public AggregationState aggregate(AggregationState current, Double value) {
current.setSum(current.getSum() + value);
current.setCount(current.getCount() + 1);
current.setMin(Math.min(current.getMin(), value));
current.setMax(Math.max(current.getMax(), value));
return current;
}
}
public record AggregationResult<K>(K key, AggregationState state, long timestamp) {}
}

Fault Tolerance

7. Error Handling and Recovery

// Circuit breaker for pipeline components
@Component
@Slf4j
public class PipelineCircuitBreaker {
private final Map<String, CircuitBreaker> breakers;
private final CircuitBreakerConfig defaultConfig;
public PipelineCircuitBreaker() {
this.breakers = new ConcurrentHashMap<>();
this.defaultConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slowCallDurationThreshold(Duration.ofSeconds(5))
.permittedNumberOfCallsInHalfOpenState(3)
.minimumNumberOfCalls(10)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(20)
.build();
}
public <T> T execute(String breakerName, Supplier<T> supplier) {
CircuitBreaker breaker = breakers.computeIfAbsent(breakerName, 
name -> CircuitBreaker.of(name, defaultConfig));
return breaker.executeSupplier(supplier);
}
public void execute(String breakerName, Runnable runnable) {
CircuitBreaker breaker = breakers.computeIfAbsent(breakerName, 
name -> CircuitBreaker.of(name, defaultConfig));
breaker.executeRunnable(runnable);
}
public CircuitBreaker.State getState(String breakerName) {
CircuitBreaker breaker = breakers.get(breakerName);
return breaker != null ? breaker.getState() : CircuitBreaker.State.CLOSED;
}
}
// Retry mechanism with exponential backoff
@Component
@Slf4j
public class PipelineRetryManager {
private final Map<String, Retry> retries;
private final RetryConfig defaultConfig;
public PipelineRetryManager() {
this.retries = new ConcurrentHashMap<>();
this.defaultConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofSeconds(1))
.retryExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(NullPointerException.class)
.build();
}
public <T> T executeWithRetry(String retryName, Supplier<T> supplier) {
Retry retry = retries.computeIfAbsent(retryName, 
name -> Retry.of(name, defaultConfig));
return retry.executeSupplier(supplier);
}
public void executeWithRetry(String retryName, Runnable runnable) {
Retry retry = retries.computeIfAbsent(retryName, 
name -> Retry.of(name, defaultConfig));
retry.executeRunnable(runnable);
}
public RetryConfig getRetryConfig(String retryName) {
Retry retry = retries.get(retryName);
return retry != null ? retry.getRetryConfig() : defaultConfig;
}
}
// Dead letter queue for failed elements
@Component
@Slf4j
public class DeadLetterQueue<T> {
private final Sink<T> dlqSink;
private final ObjectMapper objectMapper;
private final String queueName;
public DeadLetterQueue(Sink<T> dlqSink, ObjectMapper objectMapper, String queueName) {
this.dlqSink = dlqSink;
this.objectMapper = objectMapper;
this.queueName = queueName;
}
public void sendToDLQ(T element, Throwable error) {
try {
DLQMessage<T> message = new DLQMessage<>(element, error, System.currentTimeMillis());
dlqSink.write((T) message); // Cast is safe due to type erasure
log.warn("Sent element to DLQ {}: {}", queueName, element);
} catch (Exception e) {
log.error("Failed to send element to DLQ {}: {}", queueName, element, e);
}
}
public void sendToDLQ(T element, String errorMessage) {
sendToDLQ(element, new RuntimeException(errorMessage));
}
public static class DLQMessage<T> {
private final T originalElement;
private final String errorMessage;
private final String stackTrace;
private final long timestamp;
private final String messageId;
public DLQMessage(T originalElement, Throwable error, long timestamp) {
this.originalElement = originalElement;
this.errorMessage = error.getMessage();
this.stackTrace = getStackTrace(error);
this.timestamp = timestamp;
this.messageId = UUID.randomUUID().toString();
}
private String getStackTrace(Throwable error) {
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
// Getters
public T getOriginalElement() { return originalElement; }
public String getErrorMessage() { return errorMessage; }
public String getStackTrace() { return stackTrace; }
public long getTimestamp() { return timestamp; }
public String getMessageId() { return messageId; }
}
}
// Checkpointing for pipeline state
@Component
@Slf4j
public class PipelineCheckpointer {
private final StateStore stateStore;
private final ScheduledExecutorService checkpointScheduler;
private final Map<String, Checkpointable> checkpointables;
public PipelineCheckpointer(StateStore stateStore) {
this.stateStore = stateStore;
this.checkpointScheduler = Executors.newSingleThreadScheduledExecutor();
this.checkpointables = new ConcurrentHashMap<>();
}
public void registerCheckpointable(String name, Checkpointable checkpointable) {
checkpointables.put(name, checkpointable);
log.info("Registered checkpointable: {}", name);
}
public void startCheckpointing(Duration interval) {
checkpointScheduler.scheduleAtFixedRate(this::performCheckpoint,
interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
log.info("Started checkpointing with interval: {}", interval);
}
public void stopCheckpointing() {
checkpointScheduler.shutdown();
try {
if (!checkpointScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
checkpointScheduler.shutdownNow();
}
} catch (InterruptedException e) {
checkpointScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("Stopped checkpointing");
}
public void performCheckpoint() {
checkpointables.forEach((name, checkpointable) -> {
try {
checkpointable.checkpoint();
log.debug("Performed checkpoint for: {}", name);
} catch (Exception e) {
log.error("Failed to perform checkpoint for: {}", name, e);
}
});
}
public void restoreFromCheckpoint() {
checkpointables.forEach((name, checkpointable) -> {
try {
checkpointable.restore();
log.debug("Restored from checkpoint for: {}", name);
} catch (Exception e) {
log.error("Failed to restore from checkpoint for: {}", name, e);
}
});
}
public interface Checkpointable {
void checkpoint();
void restore();
}
}

Monitoring & Metrics

8. Comprehensive Monitoring

// Metrics service for pipeline monitoring
@Component
@Slf4j
public class PipelineMetrics {
private final MeterRegistry meterRegistry;
private final Map<String, Counter> processedCounters;
private final Map<String, Counter> failedCounters;
private final Map<String, Timer> processingTimers;
private final Map<String, Gauge> queueSizeGauges;
public PipelineMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.processedCounters = new ConcurrentHashMap<>();
this.failedCounters = new ConcurrentHashMap<>();
this.processingTimers = new ConcurrentHashMap<>();
this.queueSizeGauges = new ConcurrentHashMap<>();
}
public void recordPipelineStart(String pipelineName) {
meterRegistry.counter("pipeline.starts", "pipeline", pipelineName).increment();
}
public void recordPipelineStop(String pipelineName) {
meterRegistry.counter("pipeline.stops", "pipeline", pipelineName).increment();
}
public void recordElementProcessed(String pipelineName, String elementId) {
getProcessedCounter(pipelineName).increment();
meterRegistry.counter("pipeline.elements.processed", "pipeline", pipelineName).increment();
}
public void recordElementFailed(String pipelineName, String elementId, Throwable error) {
getFailedCounter(pipelineName).increment();
meterRegistry.counter("pipeline.elements.failed", "pipeline", pipelineName).increment();
meterRegistry.counter("pipeline.errors", "pipeline", pipelineName, "error", error.getClass().getSimpleName()).increment();
}
public void recordBatchProcessed(String pipelineName, int batchSize) {
meterRegistry.counter("pipeline.batches.processed", "pipeline", pipelineName).increment();
meterRegistry.summary("pipeline.batch.size", "pipeline", pipelineName).record(batchSize);
}
public void recordBatchFailed(String pipelineName, int batchSize, Throwable error) {
meterRegistry.counter("pipeline.batches.failed", "pipeline", pipelineName).increment();
}
public void recordWindowElementProcessed(String windowName) {
meterRegistry.counter("window.elements.processed", "window", windowName).increment();
}
public Timer.Sample startProcessingTimer(String pipelineName) {
return Timer.start(meterRegistry);
}
public void stopProcessingTimer(Timer.Sample sample, String pipelineName) {
sample.stop(getProcessingTimer(pipelineName));
}
public void recordQueueSize(String queueName, int size) {
Gauge gauge = queueSizeGauges.computeIfAbsent(queueName, 
name -> Gauge.builder("pipeline.queue.size")
.tag("queue", name)
.register(meterRegistry));
// Note: Gauge registration is handled by Micrometer, this is conceptual
}
public double getProcessedCount(String pipelineName) {
Counter counter = processedCounters.get(pipelineName);
return counter != null ? counter.count() : 0.0;
}
public double getFailedCount(String pipelineName) {
Counter counter = failedCounters.get(pipelineName);
return counter != null ? counter.count() : 0.0;
}
public double getProcessingRate(String pipelineName) {
// This would need more sophisticated rate calculation
return getProcessedCount(pipelineName) / (System.currentTimeMillis() / 1000.0);
}
private Counter getProcessedCounter(String pipelineName) {
return processedCounters.computeIfAbsent(pipelineName,
name -> meterRegistry.counter("pipeline.processed", "pipeline", name));
}
private Counter getFailedCounter(String pipelineName) {
return failedCounters.computeIfAbsent(pipelineName,
name -> meterRegistry.counter("pipeline.failed", "pipeline", name));
}
private Timer getProcessingTimer(String pipelineName) {
return processingTimers.computeIfAbsent(pipelineName,
name -> Timer.builder("pipeline.processing.time")
.tag("pipeline", name)
.register(meterRegistry));
}
}
// Health indicators for pipeline components
@Component
public class PipelineHealthIndicator implements HealthIndicator {
private final PipelineMetrics metrics;
private final Map<String, Pipeline<?>> pipelines;
public PipelineHealthIndicator(PipelineMetrics metrics) {
this.metrics = metrics;
this.pipelines = new ConcurrentHashMap<>();
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
}
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
Health.Builder status = Health.up();
for (Map.Entry<String, Pipeline<?>> entry : pipelines.entrySet()) {
String pipelineName = entry.getKey();
Pipeline<?> pipeline = entry.getValue();
PipelineStats stats = pipeline.getStats();
Map<String, Object> pipelineDetails = new HashMap<>();
pipelineDetails.put("processed", stats.processedCount());
pipelineDetails.put("failed", stats.failedCount());
pipelineDetails.put("rate", stats.processingRate());
// Check for stalled pipeline
if (stats.processingRate() < 1.0) { // Less than 1 element per second
pipelineDetails.put("status", "SLOW");
status = Health.down();
} else {
pipelineDetails.put("status", "HEALTHY");
}
details.put(pipelineName, pipelineDetails);
}
return status.withDetails(details).build();
}
}
// Pipeline dashboard service
@Component
@Slf4j
public class PipelineDashboard {
private final PipelineMetrics metrics;
private final Map<String, Pipeline<?>> pipelines;
private final ScheduledExecutorService dashboardScheduler;
public PipelineDashboard(PipelineMetrics metrics) {
this.metrics = metrics;
this.pipelines = new ConcurrentHashMap<>();
this.dashboardScheduler = Executors.newSingleThreadScheduledExecutor();
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
}
public void startDashboardReporting(Duration interval) {
dashboardScheduler.scheduleAtFixedRate(this::reportPipelineStats,
interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
}
public void stopDashboardReporting() {
dashboardScheduler.shutdown();
}
private void reportPipelineStats() {
log.info("=== Pipeline Dashboard Report ===");
for (Map.Entry<String, Pipeline<?>> entry : pipelines.entrySet()) {
String pipelineName = entry.getKey();
Pipeline<?> pipeline = entry.getValue();
PipelineStats stats = pipeline.getStats();
log.info("Pipeline: {}", pipelineName);
log.info("  Processed: {}, Failed: {}, Rate: {}/s", 
stats.processedCount(), stats.failedCount(), stats.processingRate());
if (stats.sourceStats() != null) {
log.info("  Source: {} (emitted: {}, errors: {})",
stats.sourceStats().sourceName(),
stats.sourceStats().elementsEmitted(),
stats.sourceStats().errors());
}
if (stats.sinkStats() != null) {
log.info("  Sink: {} (written: {}, errors: {})",
stats.sinkStats().sinkName(),
stats.sinkStats().elementsWritten(),
stats.sinkStats().errors());
}
}
log.info("=================================");
}
public Map<String, Object> getPipelineMetrics(String pipelineName) {
Pipeline<?> pipeline = pipelines.get(pipelineName);
if (pipeline == null) {
return Map.of("error", "Pipeline not found: " + pipelineName);
}
PipelineStats stats = pipeline.getStats();
return Map.of(
"name", pipelineName,
"processed", stats.processedCount(),
"failed", stats.failedCount(),
"rate", stats.processingRate(),
"source", stats.sourceStats() != null ? Map.of(
"name", stats.sourceStats().sourceName(),
"emitted", stats.sourceStats().elementsEmitted(),
"errors", stats.sourceStats().errors()
) : null,
"sink", stats.sinkStats() != null ? Map.of(
"name", stats.sinkStats().sinkName(),
"written", stats.sinkStats().elementsWritten(),
"errors", stats.sinkStats().errors()
) : null
);
}
}

Spring Boot Integration

9. Complete Spring Boot Configuration

# application.yml
spring:
application:
name: dataflow-pipeline
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: pipeline-consumer
auto-offset-reset: earliest
producer:
acks: all
retries: 3
datasource:
url: jdbc:postgresql://localhost:5432/pipeline
username: pipeline
password: pipeline
hikari:
maximum-pool-size: 10
jackson:
property-naming-strategy: SNAKE_CASE
default-property-inclusion: NON_NULL
# Pipeline Configuration
pipeline:
batch:
size: 1000
flush-interval: 5s
window:
size: 1m
slide: 30s
state:
store: file
directory: /tmp/pipeline-state
retry:
max-attempts: 3
backoff-initial-interval: 1s
circuit-breaker:
failure-threshold: 50
wait-duration: 30s
# Pipeline definitions
pipelines:
user-activity:
source:
type: kafka
topic: user-events
transforms:
- type: filter
condition: "eventType == 'CLICK'"
- type: map
field: "userId"
- type: window
strategy: session
timeout: 30m
sink:
type: kafka
topic: user-sessions
order-processing:
source:
type: jdbc
query: "SELECT * FROM orders WHERE status = 'PENDING'"
poll-interval: 10s
transforms:
- type: enrich
service: customer-service
field: "customerId"
- type: batch
size: 100
sink:
type: jdbc
table: processed_orders
# Management
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
distribution:
percentiles-histogram:
"[pipeline.processing.time]": true

10. Spring Configuration and Main Application

@Configuration
@EnableConfigurationProperties(PipelineProperties.class)
public class PipelineConfiguration {
@Bean
public PipelineMetrics pipelineMetrics(MeterRegistry meterRegistry) {
return new PipelineMetrics(meterRegistry);
}
@Bean
public PipelineStateManager pipelineStateManager(StateStore stateStore) {
return new PipelineStateManager(stateStore);
}
@Bean
@ConditionalOnProperty(name = "pipeline.state.store", havingValue = "file")
public StateStore fileStateStore(PipelineProperties properties, ObjectMapper objectMapper) throws IOException {
return new FileStateStore(properties.getState().getDirectory(), objectMapper);
}
@Bean
@ConditionalOnMissingBean(StateStore.class)
public StateStore inMemoryStateStore(ObjectMapper objectMapper) {
return new InMemoryStateStore(objectMapper);
}
@Bean
public PipelineCircuitBreaker pipelineCircuitBreaker() {
return new PipelineCircuitBreaker();
}
@Bean
public PipelineRetryManager pipelineRetryManager() {
return new PipelineRetryManager();
}
@Bean
public PipelineCheckpointer pipelineCheckpointer(StateStore stateStore) {
return new PipelineCheckpointer(stateStore);
}
@Bean
public PipelineHealthIndicator pipelineHealthIndicator(PipelineMetrics metrics) {
return new PipelineHealthIndicator(metrics);
}
@Bean
public PipelineDashboard pipelineDashboard(PipelineMetrics metrics) {
return new PipelineDashboard(metrics);
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
@ConfigurationProperties(prefix = "pipeline")
@Data
public class PipelineProperties {
private Batch batch = new Batch();
private Window window = new Window();
private State state = new State();
private Retry retry = new Retry();
private CircuitBreaker circuitBreaker = new CircuitBreaker();
private Map<String, PipelineConfig> pipelines = new HashMap<>();
@Data
public static class Batch {
private int size = 1000;
private Duration flushInterval = Duration.ofSeconds(5);
}
@Data
public static class Window {
private Duration size = Duration.ofMinutes(1);
private Duration slide = Duration.ofSeconds(30);
}
@Data
public static class State {
private String store = "memory";
private String directory = "/tmp/pipeline-state";
}
@Data
public static class Retry {
private int maxAttempts = 3;
private Duration backoffInitialInterval = Duration.ofSeconds(1);
}
@Data
public static class CircuitBreaker {
private int failureThreshold = 50;
private Duration waitDuration = Duration.ofSeconds(30);
}
@Data
public static class PipelineConfig {
private SourceConfig source;
private List<TransformConfig> transforms = new ArrayList<>();
private SinkConfig sink;
}
@Data
public static class SourceConfig {
private String type;
private String topic;
private String query;
private Duration pollInterval;
}
@Data
public static class TransformConfig {
private String type;
private String condition;
private String field;
private String service;
}
@Data
public static class SinkConfig {
private String type;
private String topic;
private String table;
}
}
// Main application with pipeline orchestration
@SpringBootApplication
@Slf4j
public class DataflowPipelineApplication {
public static void main(String[] args) {
SpringApplication.run(DataflowPipelineApplication.class, args);
}
@Bean
public CommandLineRunner pipelineOrchestrator(PipelineProperties properties,
PipelineMetrics metrics,
KafkaTemplate<String, String> kafkaTemplate,
DataSource dataSource,
ObjectMapper objectMapper) {
return args -> {
log.info("Initializing dataflow pipelines...");
for (Map.Entry<String, PipelineProperties.PipelineConfig> entry : properties.getPipelines().entrySet()) {
String pipelineName = entry.getKey();
PipelineProperties.PipelineConfig config = entry.getValue();
createPipeline(pipelineName, config, metrics, kafkaTemplate, dataSource, objectMapper);
}
log.info("All pipelines initialized successfully");
};
}
private void createPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, KafkaTemplate<String, String> kafkaTemplate,
DataSource dataSource, ObjectMapper objectMapper) {
// Implementation would create pipeline based on configuration
log.info("Creating pipeline: {} with config: {}", name, config);
// Example pipeline creation logic
if ("user-activity".equals(name)) {
createUserActivityPipeline(name, config, metrics, kafkaTemplate, objectMapper);
} else if ("order-processing".equals(name)) {
createOrderProcessingPipeline(name, config, metrics, dataSource, objectMapper);
}
}
private void createUserActivityPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper) {
// Create user activity pipeline
Pipeline<String> pipeline = new DataflowPipeline<>(name, metrics);
// Configure source
KafkaSource<String> source = new KafkaSource<>(kafkaTemplate, objectMapper, String.class,
config.getSource().getTopic(), "user-activity-group");
// Configure transforms
pipeline.from(source)
.filter(new FilterTransform<>("click-filter", 
event -> event.contains("\"eventType\":\"CLICK\"")))
.transform(new MapTransform<>("extract-userId",
event -> extractUserId(event)))
.window(new SessionWindow<>(
this::extractSessionId,
event -> System.currentTimeMillis(),
Duration.ofMinutes(30),
Duration.ofSeconds(30)
));
// Configure sink
KafkaSink<String> sink = new KafkaSink<>(kafkaTemplate, config.getSink().getTopic(), objectMapper);
pipeline.to(sink);
pipeline.start();
log.info("Started user activity pipeline: {}", name);
}
private void createOrderProcessingPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, DataSource dataSource,
ObjectMapper objectMapper) {
// Create order processing pipeline
BatchPipeline<String> pipeline = new BatchPipeline<>(name, metrics,
properties.getBatch().getSize(), properties.getBatch().getFlushInterval());
// Configure source
JdbcSource<String> source = new JdbcSource<>(dataSource, config.getSource().getQuery(),
rs -> rs.getString("order_data"), config.getSource().getPollInterval().toMillis());
// Configure pipeline
pipeline.from(source)
.to(new JdbcSink<>(dataSource, config.getSink().getTable()));
pipeline.start();
log.info("Started order processing pipeline: {}", name);
}
private String extractUserId(String event) {
// Simplified implementation
try {
JsonNode node = objectMapper.readTree(event);
return node.get("userId").asText();
} catch (Exception e) {
return "unknown";
}
}
private String extractSessionId(String event) {
// Simplified implementation
try {
JsonNode node = objectMapper.readTree(event);
return node.get("sessionId").asText();
} catch (Exception e) {
return "unknown-session";
}
}
}
// REST controller for pipeline management
@RestController
@RequestMapping("/api/pipelines")
@Slf4j
public class PipelineController {
private final PipelineDashboard dashboard;
private final Map<String, Pipeline<?>> pipelines;
public PipelineController(PipelineDashboard dashboard) {
this.dashboard = dashboard;
this.pipelines = new ConcurrentHashMap<>();
}
@PostMapping("/{name}/start")
public ResponseEntity<String> startPipeline(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
pipeline.start();
return ResponseEntity.ok("Pipeline " + name + " started");
}
@PostMapping("/{name}/stop")
public ResponseEntity<String> stopPipeline(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
pipeline.stop();
return ResponseEntity.ok("Pipeline " + name + " stopped");
}
@GetMapping("/{name}/metrics")
public ResponseEntity<Map<String, Object>> getPipelineMetrics(@PathVariable String name) {
Map<String, Object> metrics = dashboard.getPipelineMetrics(name);
if (metrics.containsKey("error")) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(metrics);
}
@GetMapping("/{name}/stats")
public ResponseEntity<PipelineStats> getPipelineStats(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(pipeline.getStats());
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
dashboard.registerPipeline(name, pipeline);
}
}

This comprehensive dataflow pipeline implementation provides:

  • Modular pipeline architecture with sources, transforms, and sinks
  • Both stream and batch processing capabilities
  • Advanced windowing strategies for time-based operations
  • Robust state management with persistence
  • Comprehensive fault tolerance with circuit breakers and retries
  • Detailed monitoring and metrics for observability
  • Spring Boot integration for easy deployment and configuration

The pipeline is designed to be extensible, scalable, and production-ready for various data processing scenarios.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper