Learn how to build robust, scalable data processing pipelines in Java with modern streaming and batch processing capabilities.
Table of Contents
- Pipeline Architecture
- Core Pipeline Components
- Stream Processing
- Batch Processing
- Windowed Operations
- State Management
- Fault Tolerance
- Monitoring & Metrics
- Spring Boot Integration
Pipeline Architecture
Dataflow Pipeline Patterns
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Source │───▶│ Transform │───▶│ Window │───▶│ Sink │ │ │ │ │ │ & Agg │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Filter │ │ State │ │ Kafka, │ │ & Map │ │ Management │ │ DB, S3 │ └─────────────┘ └─────────────┘ └─────────────┘
Core Pipeline Components
1. Pipeline Foundation Classes
// Core pipeline interfaces
public interface Pipeline<T> {
String getName();
Pipeline<T> from(Source<T> source);
<R> Pipeline<R> transform(Transform<T, R> transform);
Pipeline<T> filter(Filter<T> filter);
Pipeline<T> window(WindowStrategy<T> windowStrategy);
void to(Sink<T> sink);
void start();
void stop();
PipelineStats getStats();
}
public interface Source<T> {
String getName();
void start(SourceContext<T> context);
void stop();
SourceStats getStats();
}
public interface SourceContext<T> {
void emit(T element);
void markProcessed(String elementId);
void markFailed(String elementId, Throwable error);
}
public interface Transform<T, R> {
String getName();
R apply(T input);
void onError(T input, Throwable error);
}
public interface Sink<T> {
String getName();
void write(T element);
void writeBatch(List<T> elements);
void flush();
SinkStats getStats();
}
// Base pipeline implementation
@Component
@Slf4j
public class DataflowPipeline<T> implements Pipeline<T> {
private final String name;
private final ExecutorService executor;
private final PipelineMetrics metrics;
private Source<T> source;
private final List<Transform<?, ?>> transforms;
private Sink<T> sink;
private WindowStrategy<T> windowStrategy;
private volatile boolean running = false;
public DataflowPipeline(String name, PipelineMetrics metrics) {
this.name = name;
this.metrics = metrics;
this.transforms = new CopyOnWriteArrayList<>();
this.executor = Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
.setNameFormat("pipeline-" + name + "-%d")
.build());
}
@Override
public Pipeline<T> from(Source<T> source) {
this.source = source;
return this;
}
@Override
@SuppressWarnings("unchecked")
public <R> Pipeline<R> transform(Transform<T, R> transform) {
this.transforms.add(transform);
// Return a new pipeline with the transformed type
DataflowPipeline<R> newPipeline = new DataflowPipeline<>(name + "-transformed", metrics);
newPipeline.source = new TransformedSource<>(this, transform);
return newPipeline;
}
@Override
public Pipeline<T> filter(Filter<T> filter) {
this.transforms.add(new FilterTransform<>(filter));
return this;
}
@Override
public Pipeline<T> window(WindowStrategy<T> windowStrategy) {
this.windowStrategy = windowStrategy;
return this;
}
@Override
public void to(Sink<T> sink) {
this.sink = sink;
}
@Override
public void start() {
if (running) {
log.warn("Pipeline {} is already running", name);
return;
}
running = true;
executor.submit(() -> {
log.info("Starting pipeline: {}", name);
metrics.recordPipelineStart(name);
source.start(new SourceContext<T>() {
@Override
public void emit(T element) {
processElement(element);
}
@Override
public void markProcessed(String elementId) {
metrics.recordElementProcessed(name, elementId);
}
@Override
public void markFailed(String elementId, Throwable error) {
metrics.recordElementFailed(name, elementId, error);
}
});
log.info("Pipeline {} started successfully", name);
});
}
@Override
public void stop() {
running = false;
source.stop();
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
metrics.recordPipelineStop(name);
log.info("Pipeline {} stopped", name);
}
@Override
public PipelineStats getStats() {
return new PipelineStats(
name,
metrics.getProcessedCount(name),
metrics.getFailedCount(name),
metrics.getProcessingRate(name),
source.getStats(),
sink != null ? sink.getStats() : null
);
}
@SuppressWarnings("unchecked")
private void processElement(T element) {
if (!running) return;
try {
Object current = element;
// Apply all transforms
for (Transform transform : transforms) {
current = transform.apply(current);
if (current == null) {
// Element was filtered out
return;
}
}
// Apply windowing if configured
if (windowStrategy != null) {
windowStrategy.process((T) current);
} else if (sink != null) {
// Write to sink
sink.write((T) current);
}
metrics.recordElementProcessed(name, element.toString());
} catch (Exception e) {
log.error("Error processing element in pipeline {}: {}", name, element, e);
metrics.recordElementFailed(name, element.toString(), e);
}
}
// Statistics classes
public record PipelineStats(
String pipelineName,
long processedCount,
long failedCount,
double processingRate,
SourceStats sourceStats,
SinkStats sinkStats
) {}
public record SourceStats(
String sourceName,
long elementsEmitted,
long errors,
double emissionRate
) {}
public record SinkStats(
String sinkName,
long elementsWritten,
long errors,
double writeRate
) {}
}
2. Source Implementations
// Kafka Source
@Component
@Slf4j
public class KafkaSource<T> implements Source<T> {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Class<T> type;
private final String topic;
private final String groupId;
private volatile boolean running = false;
private KafkaConsumer<String, String> consumer;
private Thread consumerThread;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public KafkaSource(KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
Class<T> type,
String topic,
String groupId) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.type = type;
this.topic = topic;
this.groupId = groupId;
}
@Override
public String getName() {
return "kafka-source-" + topic;
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("Kafka source for topic {} is already running", topic);
return;
}
running = true;
consumerThread = new Thread(() -> {
log.info("Starting Kafka source for topic: {}", topic);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
T element = objectMapper.readValue(record.value(), type);
context.emit(element);
elementsEmitted.incrementAndGet();
// Manual commit after successful processing
consumer.commitSync();
} catch (Exception e) {
log.error("Failed to process Kafka record from topic {}: {}", topic, record.value(), e);
errors.incrementAndGet();
context.markFailed(record.key(), e);
}
}
} catch (Exception e) {
log.error("Error in Kafka consumer for topic {}", topic, e);
errors.incrementAndGet();
}
}
consumer.close();
log.info("Kafka source for topic {} stopped", topic);
}, "kafka-source-" + topic);
consumerThread.start();
}
@Override
public void stop() {
running = false;
if (consumerThread != null) {
try {
consumerThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0) // Simple rate calculation
);
}
}
// File Source
@Component
@Slf4j
public class FileSource<T> implements Source<T> {
private final String filePath;
private final ObjectMapper objectMapper;
private final Class<T> type;
private final boolean watchForChanges;
private volatile boolean running = false;
private Thread fileReaderThread;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public FileSource(String filePath, ObjectMapper objectMapper, Class<T> type, boolean watchForChanges) {
this.filePath = filePath;
this.objectMapper = objectMapper;
this.type = type;
this.watchForChanges = watchForChanges;
}
@Override
public String getName() {
return "file-source-" + filePath;
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("File source for {} is already running", filePath);
return;
}
running = true;
fileReaderThread = new Thread(() -> {
log.info("Starting file source for: {}", filePath);
try {
Path path = Paths.get(filePath);
if (watchForChanges) {
watchFileChanges(path, context);
} else {
processExistingFile(path, context);
}
} catch (Exception e) {
log.error("Error in file source for {}", filePath, e);
}
log.info("File source for {} stopped", filePath);
}, "file-source-" + filePath);
fileReaderThread.start();
}
private void processExistingFile(Path path, SourceContext<T> context) throws IOException {
if (!Files.exists(path)) {
log.warn("File does not exist: {}", path);
return;
}
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
try {
T element = objectMapper.readValue(line, type);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to parse line from file {}: {}", filePath, line, e);
errors.incrementAndGet();
context.markFailed(line, e);
}
});
}
}
private void watchFileChanges(Path path, SourceContext<T> context) throws IOException {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path directory = path.getParent();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
long lastModified = 0;
while (running) {
try {
WatchKey key = watchService.poll(1, TimeUnit.SECONDS);
if (key != null) {
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
Path changedFile = (Path) event.context();
if (changedFile.equals(path.getFileName())) {
long currentModified = Files.getLastModifiedTime(path).toMillis();
if (currentModified > lastModified) {
processNewLines(path, lastModified, context);
lastModified = currentModified;
}
}
}
}
key.reset();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error watching file {}", path, e);
}
}
watchService.close();
}
private void processNewLines(Path path, long since, SourceContext<T> context) throws IOException {
// Implementation to read only new lines since last modification
// This is a simplified version
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
try {
T element = objectMapper.readValue(line, type);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to parse line from file {}: {}", filePath, line, e);
errors.incrementAndGet();
}
});
}
}
@Override
public void stop() {
running = false;
if (fileReaderThread != null) {
fileReaderThread.interrupt();
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0)
);
}
}
// Database Source (JDBC)
@Component
@Slf4j
public class JdbcSource<T> implements Source<T> {
private final DataSource dataSource;
private final String query;
private final RowMapper<T> rowMapper;
private final long pollIntervalMs;
private volatile boolean running = false;
private ScheduledExecutorService scheduler;
private final AtomicLong elementsEmitted = new AtomicLong(0);
private final AtomicLong errors = new AtomicLong(0);
public JdbcSource(DataSource dataSource, String query, RowMapper<T> rowMapper, long pollIntervalMs) {
this.dataSource = dataSource;
this.query = query;
this.rowMapper = rowMapper;
this.pollIntervalMs = pollIntervalMs;
}
@Override
public String getName() {
return "jdbc-source-" + query.hashCode();
}
@Override
public void start(SourceContext<T> context) {
if (running) {
log.warn("JDBC source is already running");
return;
}
running = true;
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(query);
ResultSet rs = stmt.executeQuery()) {
while (rs.next() && running) {
try {
T element = rowMapper.mapRow(rs);
context.emit(element);
elementsEmitted.incrementAndGet();
} catch (Exception e) {
log.error("Failed to map row from JDBC source", e);
errors.incrementAndGet();
context.markFailed("row-" + elementsEmitted.get(), e);
}
}
} catch (Exception e) {
log.error("Error in JDBC source", e);
errors.incrementAndGet();
}
}, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
log.info("JDBC source started with query: {}", query);
}
@Override
public void stop() {
running = false;
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
@Override
public SourceStats getStats() {
return new SourceStats(
getName(),
elementsEmitted.get(),
errors.get(),
elementsEmitted.get() / (System.currentTimeMillis() / 1000.0)
);
}
public interface RowMapper<T> {
T mapRow(ResultSet rs) throws SQLException;
}
}
Stream Processing
3. Transform Operations
// Core transform implementations
@Component
@Slf4j
public class MapTransform<T, R> implements Transform<T, R> {
private final Function<T, R> mapper;
private final String name;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public MapTransform(String name, Function<T, R> mapper) {
this.name = name;
this.mapper = mapper;
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
R result = mapper.apply(input);
processedCount.incrementAndGet();
return result;
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Mapping failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in map transform {} for input: {}", name, input, error);
}
public long getProcessedCount() {
return processedCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
}
@Component
@Slf4j
public class FilterTransform<T> implements Transform<T, T> {
private final Predicate<T> predicate;
private final String name;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong filteredCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public FilterTransform(String name, Predicate<T> predicate) {
this.name = name;
this.predicate = predicate;
}
@Override
public String getName() {
return name;
}
@Override
public T apply(T input) {
try {
processedCount.incrementAndGet();
if (predicate.test(input)) {
return input;
} else {
filteredCount.incrementAndGet();
return null; // Filtered out
}
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Filter failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in filter transform {} for input: {}", name, input, error);
}
public long getProcessedCount() {
return processedCount.get();
}
public long getFilteredCount() {
return filteredCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
}
// Complex transform: Keyed operations
@Component
@Slf4j
public class KeyedTransform<T, K, R> implements Transform<T, R> {
private final Function<T, K> keyExtractor;
private final Function<List<T>, R> aggregator;
private final String name;
private final int windowSize;
private final Map<K, List<T>> keyedBuffer;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public KeyedTransform(String name, Function<T, K> keyExtractor,
Function<List<T>, R> aggregator, int windowSize) {
this.name = name;
this.keyExtractor = keyExtractor;
this.aggregator = aggregator;
this.windowSize = windowSize;
this.keyedBuffer = new ConcurrentHashMap<>();
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
K key = keyExtractor.apply(input);
List<T> buffer = keyedBuffer.computeIfAbsent(key, k -> new ArrayList<>());
synchronized (buffer) {
buffer.add(input);
if (buffer.size() >= windowSize) {
R result = aggregator.apply(new ArrayList<>(buffer));
buffer.clear();
processedCount.incrementAndGet();
return result;
}
}
processedCount.incrementAndGet();
return null; // Not enough elements for aggregation yet
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Keyed transform failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in keyed transform {} for input: {}", name, input, error);
}
// Flush remaining elements
public List<R> flush() {
List<R> results = new ArrayList<>();
for (List<T> buffer : keyedBuffer.values()) {
synchronized (buffer) {
if (!buffer.isEmpty()) {
results.add(aggregator.apply(new ArrayList<>(buffer)));
buffer.clear();
}
}
}
return results;
}
}
// Enrichment transform
@Component
@Slf4j
public class EnrichmentTransform<T, R> implements Transform<T, R> {
private final Function<T, R> enricher;
private final String name;
private final Cache<String, R> cache;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong cacheHits = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public EnrichmentTransform(String name, Function<T, R> enricher,
long cacheSize, Duration cacheTtl) {
this.name = name;
this.enricher = enricher;
this.cache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(cacheTtl)
.build();
}
@Override
public String getName() {
return name;
}
@Override
public R apply(T input) {
try {
String cacheKey = generateCacheKey(input);
R result = cache.getIfPresent(cacheKey);
if (result == null) {
result = enricher.apply(input);
cache.put(cacheKey, result);
} else {
cacheHits.incrementAndGet();
}
processedCount.incrementAndGet();
return result;
} catch (Exception e) {
errorCount.incrementAndGet();
onError(input, e);
throw new TransformException("Enrichment failed for input: " + input, e);
}
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in enrichment transform {} for input: {}", name, input, error);
}
private String generateCacheKey(T input) {
return Objects.hash(input).toString();
}
public double getCacheHitRate() {
return processedCount.get() > 0 ? (double) cacheHits.get() / processedCount.get() : 0.0;
}
}
Batch Processing
4. Batch Pipeline Components
// Batch pipeline for processing large datasets
@Component
@Slf4j
public class BatchPipeline<T> {
private final String name;
private final ExecutorService executor;
private final PipelineMetrics metrics;
private final int batchSize;
private final Duration flushInterval;
private Source<T> source;
private final List<Transform<T, ?>> transforms;
private Sink<T> sink;
private volatile boolean running = false;
private final List<T> batchBuffer;
public BatchPipeline(String name, PipelineMetrics metrics, int batchSize, Duration flushInterval) {
this.name = name;
this.metrics = metrics;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.transforms = new CopyOnWriteArrayList<>();
this.batchBuffer = Collections.synchronizedList(new ArrayList<>());
this.executor = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder()
.setNameFormat("batch-pipeline-" + name + "-%d")
.build());
}
public BatchPipeline<T> from(Source<T> source) {
this.source = source;
return this;
}
@SuppressWarnings("unchecked")
public <R> BatchPipeline<R> transform(Transform<T, R> transform) {
this.transforms.add(transform);
// In batch processing, we typically maintain the same pipeline
return (BatchPipeline<R>) this;
}
public BatchPipeline<T> to(Sink<T> sink) {
this.sink = sink;
return this;
}
public void start() {
if (running) {
log.warn("Batch pipeline {} is already running", name);
return;
}
running = true;
// Start batch flusher
executor.submit(this::batchFlusher);
// Start source
source.start(new SourceContext<T>() {
@Override
public void emit(T element) {
addToBatch(element);
}
@Override
public void markProcessed(String elementId) {
metrics.recordElementProcessed(name, elementId);
}
@Override
public void markFailed(String elementId, Throwable error) {
metrics.recordElementFailed(name, elementId, error);
}
});
log.info("Batch pipeline {} started with batch size: {}, flush interval: {}",
name, batchSize, flushInterval);
}
public void stop() {
running = false;
source.stop();
// Flush remaining elements
flushBatch();
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
metrics.recordPipelineStop(name);
log.info("Batch pipeline {} stopped", name);
}
private void addToBatch(T element) {
synchronized (batchBuffer) {
batchBuffer.add(element);
if (batchBuffer.size() >= batchSize) {
flushBatch();
}
}
}
private void batchFlusher() {
while (running) {
try {
Thread.sleep(flushInterval.toMillis());
flushBatch();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error in batch flusher for pipeline {}", name, e);
}
}
}
@SuppressWarnings("unchecked")
private void flushBatch() {
List<T> currentBatch;
synchronized (batchBuffer) {
if (batchBuffer.isEmpty()) {
return;
}
currentBatch = new ArrayList<>(batchBuffer);
batchBuffer.clear();
}
if (!currentBatch.isEmpty()) {
executor.submit(() -> processBatch(currentBatch));
}
}
private void processBatch(List<T> batch) {
try {
List<T> currentBatch = batch;
// Apply transforms
for (Transform transform : transforms) {
List<Object> transformedBatch = new ArrayList<>();
for (T element : currentBatch) {
try {
Object transformed = transform.apply(element);
if (transformed != null) {
transformedBatch.add(transformed);
}
} catch (Exception e) {
log.error("Error applying transform {} to element", transform.getName(), e);
metrics.recordElementFailed(name, element.toString(), e);
}
}
currentBatch = (List<T>) transformedBatch;
}
// Write to sink
if (sink != null && !currentBatch.isEmpty()) {
sink.writeBatch(currentBatch);
metrics.recordBatchProcessed(name, currentBatch.size());
}
} catch (Exception e) {
log.error("Error processing batch in pipeline {}", name, e);
metrics.recordBatchFailed(name, batch.size(), e);
}
}
}
// Batch window for time-based batching
@Component
@Slf4j
public class TimeWindow<T> implements WindowStrategy<T> {
private final Duration windowSize;
private final Duration windowSlide;
private final List<T> windowBuffer;
private final List<WindowProcessor<T>> processors;
private ScheduledExecutorService windowScheduler;
private volatile boolean running = false;
public TimeWindow(Duration windowSize, Duration windowSlide) {
this.windowSize = windowSize;
this.windowSlide = windowSlide;
this.windowBuffer = Collections.synchronizedList(new ArrayList<>());
this.processors = new CopyOnWriteArrayList<>();
}
@Override
public String getName() {
return "time-window-" + windowSize.toMillis() + "ms";
}
@Override
public void process(T element) {
if (running) {
windowBuffer.add(element);
}
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
public void start() {
if (running) {
log.warn("Time window is already running");
return;
}
running = true;
windowScheduler = Executors.newSingleThreadScheduledExecutor();
// Schedule window processing
windowScheduler.scheduleAtFixedRate(this::processWindow,
windowSlide.toMillis(), windowSlide.toMillis(), TimeUnit.MILLISECONDS);
log.info("Time window started with size: {}, slide: {}", windowSize, windowSlide);
}
public void stop() {
running = false;
if (windowScheduler != null) {
windowScheduler.shutdown();
try {
if (!windowScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
windowScheduler.shutdownNow();
}
} catch (InterruptedException e) {
windowScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Process remaining elements
processWindow();
log.info("Time window stopped");
}
private void processWindow() {
List<T> currentWindow;
synchronized (windowBuffer) {
currentWindow = new ArrayList<>(windowBuffer);
// For sliding windows, we might keep some elements
// For tumbling windows, we clear the buffer
if (windowSize.equals(windowSlide)) {
windowBuffer.clear();
} else {
// Keep elements from the last windowSize period
long cutoffTime = System.currentTimeMillis() - windowSize.toMillis();
// This is simplified - in real implementation, you'd need timestamps
windowBuffer.removeIf(element -> shouldRemoveFromWindow(element, cutoffTime));
}
}
if (!currentWindow.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(currentWindow);
} catch (Exception e) {
log.error("Error in window processor {}", processor.getName(), e);
}
}
}
}
private boolean shouldRemoveFromWindow(T element, long cutoffTime) {
// Implementation depends on how you track element timestamps
// This is a placeholder implementation
return true; // Remove all elements for simplicity
}
public interface WindowProcessor<T> {
String getName();
void process(List<T> window);
}
}
Windowed Operations
5. Advanced Windowing Strategies
// Window manager for different window types
@Component
@Slf4j
public class WindowManager<T> {
private final Map<String, WindowStrategy<T>> windows;
private final PipelineMetrics metrics;
public WindowManager(PipelineMetrics metrics) {
this.windows = new ConcurrentHashMap<>();
this.metrics = metrics;
}
public void registerWindow(String name, WindowStrategy<T> window) {
windows.put(name, window);
log.info("Registered window: {}", name);
}
public void processElement(String windowName, T element) {
WindowStrategy<T> window = windows.get(windowName);
if (window != null) {
window.process(element);
metrics.recordWindowElementProcessed(windowName);
} else {
log.warn("Window not found: {}", windowName);
}
}
public void startAll() {
windows.values().forEach(window -> {
try {
if (window instanceof ManageableWindow) {
((ManageableWindow) window).start();
}
} catch (Exception e) {
log.error("Failed to start window: {}", window.getName(), e);
}
});
}
public void stopAll() {
windows.values().forEach(window -> {
try {
if (window instanceof ManageableWindow) {
((ManageableWindow) window).stop();
}
} catch (Exception e) {
log.error("Failed to stop window: {}", window.getName(), e);
}
});
}
public interface ManageableWindow {
void start();
void stop();
}
}
// Session window for grouping events by session
@Component
@Slf4j
public class SessionWindow<T> implements WindowStrategy<T>, WindowManager.ManageableWindow {
private final Function<T, String> sessionIdExtractor;
private final Function<T, Long> timestampExtractor;
private final Duration sessionTimeout;
private final Duration windowSlide;
private final Map<String, Session<T>> activeSessions;
private final List<WindowProcessor<T>> processors;
private ScheduledExecutorSession cleanupScheduler;
private volatile boolean running = false;
public SessionWindow(Function<T, String> sessionIdExtractor,
Function<T, Long> timestampExtractor,
Duration sessionTimeout,
Duration windowSlide) {
this.sessionIdExtractor = sessionIdExtractor;
this.timestampExtractor = timestampExtractor;
this.sessionTimeout = sessionTimeout;
this.windowSlide = windowSlide;
this.activeSessions = new ConcurrentHashMap<>();
this.processors = new CopyOnWriteArrayList<>();
}
@Override
public String getName() {
return "session-window-" + sessionTimeout.toMillis() + "ms";
}
@Override
public void process(T element) {
if (!running) return;
String sessionId = sessionIdExtractor.apply(element);
long timestamp = timestampExtractor.apply(element);
Session<T> session = activeSessions.computeIfAbsent(sessionId,
id -> new Session<>(id, timestamp));
session.addElement(element, timestamp);
session.updateLastActivity(timestamp);
}
@Override
public void start() {
if (running) {
log.warn("Session window is already running");
return;
}
running = true;
cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
// Schedule session cleanup
cleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredSessions,
windowSlide.toMillis(), windowSlide.toMillis(), TimeUnit.MILLISECONDS);
log.info("Session window started with timeout: {}", sessionTimeout);
}
@Override
public void stop() {
running = false;
if (cleanupScheduler != null) {
cleanupScheduler.shutdown();
try {
if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
cleanupScheduler.shutdownNow();
}
} catch (InterruptedException e) {
cleanupScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Process all remaining sessions
closeAllSessions();
log.info("Session window stopped");
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
private void cleanupExpiredSessions() {
long currentTime = System.currentTimeMillis();
long timeoutMillis = sessionTimeout.toMillis();
Iterator<Map.Entry<String, Session<T>>> iterator = activeSessions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Session<T>> entry = iterator.next();
Session<T> session = entry.getValue();
if (currentTime - session.getLastActivity() > timeoutMillis) {
// Session expired, process and remove
processSession(session);
iterator.remove();
}
}
}
private void closeAllSessions() {
for (Session<T> session : activeSessions.values()) {
processSession(session);
}
activeSessions.clear();
}
private void processSession(Session<T> session) {
List<T> sessionElements = session.getElements();
if (!sessionElements.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(sessionElements);
} catch (Exception e) {
log.error("Error processing session {} with processor {}",
session.getId(), processor.getName(), e);
}
}
}
}
private static class Session<T> {
private final String id;
private final List<T> elements;
private long lastActivity;
private final long startTime;
public Session(String id, long startTime) {
this.id = id;
this.elements = new ArrayList<>();
this.startTime = startTime;
this.lastActivity = startTime;
}
public void addElement(T element, long timestamp) {
elements.add(element);
}
public void updateLastActivity(long timestamp) {
this.lastActivity = timestamp;
}
public String getId() { return id; }
public List<T> getElements() { return elements; }
public long getLastActivity() { return lastActivity; }
public long getStartTime() { return startTime; }
}
}
// Global window for processing all elements as one group
@Component
@Slf4j
public class GlobalWindow<T> implements WindowStrategy<T>, WindowManager.ManageableWindow {
private final List<T> allElements;
private final List<WindowProcessor<T>> processors;
private final AtomicBoolean processed;
public GlobalWindow() {
this.allElements = Collections.synchronizedList(new ArrayList<>());
this.processors = new CopyOnWriteArrayList<>();
this.processed = new AtomicBoolean(false);
}
@Override
public String getName() {
return "global-window";
}
@Override
public void process(T element) {
if (!processed.get()) {
allElements.add(element);
}
}
@Override
public void start() {
processed.set(false);
log.info("Global window started");
}
@Override
public void stop() {
if (processed.compareAndSet(false, true)) {
processAllElements();
}
log.info("Global window stopped");
}
public void addProcessor(WindowProcessor<T> processor) {
processors.add(processor);
}
private void processAllElements() {
List<T> elements = new ArrayList<>(allElements);
if (!elements.isEmpty()) {
for (WindowProcessor<T> processor : processors) {
try {
processor.process(elements);
} catch (Exception e) {
log.error("Error in global window processor {}", processor.getName(), e);
}
}
}
allElements.clear();
}
public void triggerProcessing() {
stop();
start();
}
}
State Management
6. Stateful Processing
// State management for pipeline operations
@Component
@Slf4j
public class PipelineStateManager {
private final StateStore stateStore;
private final Map<String, PipelineState> pipelineStates;
public PipelineStateManager(StateStore stateStore) {
this.stateStore = stateStore;
this.pipelineStates = new ConcurrentHashMap<>();
}
public <T> PipelineState<T> getOrCreateState(String pipelineName, String stateName,
Class<T> stateType, T initialState) {
String fullStateName = pipelineName + ":" + stateName;
@SuppressWarnings("unchecked")
PipelineState<T> state = (PipelineState<T>) pipelineStates.get(fullStateName);
if (state == null) {
state = new PipelineState<>(fullStateName, stateType, initialState, stateStore);
pipelineStates.put(fullStateName, state);
}
return state;
}
public void saveAllStates() {
pipelineStates.values().forEach(PipelineState::save);
}
public void restoreAllStates() {
pipelineStates.values().forEach(PipelineState::restore);
}
public void clearState(String pipelineName, String stateName) {
String fullStateName = pipelineName + ":" + stateName;
PipelineState<?> state = pipelineStates.remove(fullStateName);
if (state != null) {
state.clear();
}
}
}
// Pipeline state with persistence
@Component
@Slf4j
public class PipelineState<T> {
private final String name;
private final Class<T> type;
private final StateStore stateStore;
private T currentState;
private final T initialState;
public PipelineState(String name, Class<T> type, T initialState, StateStore stateStore) {
this.name = name;
this.type = type;
this.initialState = initialState;
this.stateStore = stateStore;
this.currentState = initialState;
restore();
}
public T get() {
return currentState;
}
public void update(Function<T, T> updater) {
synchronized (this) {
T newState = updater.apply(currentState);
if (newState != null) {
currentState = newState;
}
}
}
public void set(T newState) {
synchronized (this) {
currentState = newState;
}
}
public void save() {
try {
stateStore.save(name, currentState);
log.debug("Saved state for: {}", name);
} catch (Exception e) {
log.error("Failed to save state for: {}", name, e);
}
}
public void restore() {
try {
T savedState = stateStore.load(name, type);
if (savedState != null) {
currentState = savedState;
log.debug("Restored state for: {}", name);
}
} catch (Exception e) {
log.warn("Failed to restore state for: {}, using initial state", name, e);
currentState = initialState;
}
}
public void clear() {
synchronized (this) {
currentState = initialState;
}
stateStore.delete(name);
}
public String getName() {
return name;
}
}
// State store interface with multiple implementations
public interface StateStore {
<T> void save(String key, T value);
<T> T load(String key, Class<T> type);
void delete(String key);
boolean exists(String key);
}
// In-memory state store
@Component
@Slf4j
public class InMemoryStateStore implements StateStore {
private final Map<String, Object> storage;
private final ObjectMapper objectMapper;
public InMemoryStateStore(ObjectMapper objectMapper) {
this.storage = new ConcurrentHashMap<>();
this.objectMapper = objectMapper;
}
@Override
public <T> void save(String key, T value) {
storage.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <T> T load(String key, Class<T> type) {
Object value = storage.get(key);
if (value != null && type.isInstance(value)) {
return (T) value;
}
return null;
}
@Override
public void delete(String key) {
storage.remove(key);
}
@Override
public boolean exists(String key) {
return storage.containsKey(key);
}
}
// File-based state store
@Component
@Slf4j
public class FileStateStore implements StateStore {
private final Path storageDir;
private final ObjectMapper objectMapper;
public FileStateStore(@Value("${app.state.store.dir:/tmp/pipeline-state}") String storageDir,
ObjectMapper objectMapper) throws IOException {
this.storageDir = Paths.get(storageDir);
this.objectMapper = objectMapper;
if (!Files.exists(this.storageDir)) {
Files.createDirectories(this.storageDir);
}
}
@Override
public <T> void save(String key, T value) {
try {
Path filePath = storageDir.resolve(key + ".json");
String json = objectMapper.writeValueAsString(value);
Files.write(filePath, json.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception e) {
throw new StateStoreException("Failed to save state for key: " + key, e);
}
}
@Override
public <T> T load(String key, Class<T> type) {
try {
Path filePath = storageDir.resolve(key + ".json");
if (!Files.exists(filePath)) {
return null;
}
String json = new String(Files.readAllBytes(filePath));
return objectMapper.readValue(json, type);
} catch (Exception e) {
throw new StateStoreException("Failed to load state for key: " + key, e);
}
}
@Override
public void delete(String key) {
try {
Path filePath = storageDir.resolve(key + ".json");
Files.deleteIfExists(filePath);
} catch (Exception e) {
throw new StateStoreException("Failed to delete state for key: " + key, e);
}
}
@Override
public boolean exists(String key) {
Path filePath = storageDir.resolve(key + ".json");
return Files.exists(filePath);
}
}
// Stateful transform example
@Component
@Slf4j
public class StatefulAggregationTransform<T, K, R> implements Transform<T, R> {
private final Function<T, K> keyExtractor;
private final Function<T, Double> valueExtractor;
private final AggregationFunction aggregationFunction;
private final PipelineStateManager stateManager;
private final String pipelineName;
private final String transformName;
public StatefulAggregationTransform(String pipelineName, String transformName,
Function<T, K> keyExtractor,
Function<T, Double> valueExtractor,
AggregationFunction aggregationFunction,
PipelineStateManager stateManager) {
this.pipelineName = pipelineName;
this.transformName = transformName;
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
this.aggregationFunction = aggregationFunction;
this.stateManager = stateManager;
}
@Override
public String getName() {
return transformName;
}
@Override
@SuppressWarnings("unchecked")
public R apply(T input) {
K key = keyExtractor.apply(input);
Double value = valueExtractor.apply(input);
String stateName = "aggregation-" + key.toString();
PipelineState<AggregationState> state = stateManager.getOrCreateState(
pipelineName, stateName, AggregationState.class, new AggregationState());
AggregationState currentState = state.get();
AggregationState newState = aggregationFunction.aggregate(currentState, value);
state.set(newState);
return (R) new AggregationResult(key, newState, System.currentTimeMillis());
}
@Override
public void onError(T input, Throwable error) {
log.error("Error in stateful aggregation transform for input: {}", input, error);
}
public static class AggregationState {
private double sum = 0.0;
private long count = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
// Getters and setters
public double getSum() { return sum; }
public void setSum(double sum) { this.sum = sum; }
public long getCount() { return count; }
public void setCount(long count) { this.count = count; }
public double getMin() { return min; }
public void setMin(double min) { this.min = min; }
public double getMax() { return max; }
public void setMax(double max) { this.max = max; }
public double getAverage() {
return count > 0 ? sum / count : 0.0;
}
}
public interface AggregationFunction {
AggregationState aggregate(AggregationState current, Double value);
}
public static class SumAggregation implements AggregationFunction {
@Override
public AggregationState aggregate(AggregationState current, Double value) {
current.setSum(current.getSum() + value);
current.setCount(current.getCount() + 1);
current.setMin(Math.min(current.getMin(), value));
current.setMax(Math.max(current.getMax(), value));
return current;
}
}
public record AggregationResult<K>(K key, AggregationState state, long timestamp) {}
}
Fault Tolerance
7. Error Handling and Recovery
// Circuit breaker for pipeline components
@Component
@Slf4j
public class PipelineCircuitBreaker {
private final Map<String, CircuitBreaker> breakers;
private final CircuitBreakerConfig defaultConfig;
public PipelineCircuitBreaker() {
this.breakers = new ConcurrentHashMap<>();
this.defaultConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slowCallDurationThreshold(Duration.ofSeconds(5))
.permittedNumberOfCallsInHalfOpenState(3)
.minimumNumberOfCalls(10)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(20)
.build();
}
public <T> T execute(String breakerName, Supplier<T> supplier) {
CircuitBreaker breaker = breakers.computeIfAbsent(breakerName,
name -> CircuitBreaker.of(name, defaultConfig));
return breaker.executeSupplier(supplier);
}
public void execute(String breakerName, Runnable runnable) {
CircuitBreaker breaker = breakers.computeIfAbsent(breakerName,
name -> CircuitBreaker.of(name, defaultConfig));
breaker.executeRunnable(runnable);
}
public CircuitBreaker.State getState(String breakerName) {
CircuitBreaker breaker = breakers.get(breakerName);
return breaker != null ? breaker.getState() : CircuitBreaker.State.CLOSED;
}
}
// Retry mechanism with exponential backoff
@Component
@Slf4j
public class PipelineRetryManager {
private final Map<String, Retry> retries;
private final RetryConfig defaultConfig;
public PipelineRetryManager() {
this.retries = new ConcurrentHashMap<>();
this.defaultConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofSeconds(1))
.retryExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(NullPointerException.class)
.build();
}
public <T> T executeWithRetry(String retryName, Supplier<T> supplier) {
Retry retry = retries.computeIfAbsent(retryName,
name -> Retry.of(name, defaultConfig));
return retry.executeSupplier(supplier);
}
public void executeWithRetry(String retryName, Runnable runnable) {
Retry retry = retries.computeIfAbsent(retryName,
name -> Retry.of(name, defaultConfig));
retry.executeRunnable(runnable);
}
public RetryConfig getRetryConfig(String retryName) {
Retry retry = retries.get(retryName);
return retry != null ? retry.getRetryConfig() : defaultConfig;
}
}
// Dead letter queue for failed elements
@Component
@Slf4j
public class DeadLetterQueue<T> {
private final Sink<T> dlqSink;
private final ObjectMapper objectMapper;
private final String queueName;
public DeadLetterQueue(Sink<T> dlqSink, ObjectMapper objectMapper, String queueName) {
this.dlqSink = dlqSink;
this.objectMapper = objectMapper;
this.queueName = queueName;
}
public void sendToDLQ(T element, Throwable error) {
try {
DLQMessage<T> message = new DLQMessage<>(element, error, System.currentTimeMillis());
dlqSink.write((T) message); // Cast is safe due to type erasure
log.warn("Sent element to DLQ {}: {}", queueName, element);
} catch (Exception e) {
log.error("Failed to send element to DLQ {}: {}", queueName, element, e);
}
}
public void sendToDLQ(T element, String errorMessage) {
sendToDLQ(element, new RuntimeException(errorMessage));
}
public static class DLQMessage<T> {
private final T originalElement;
private final String errorMessage;
private final String stackTrace;
private final long timestamp;
private final String messageId;
public DLQMessage(T originalElement, Throwable error, long timestamp) {
this.originalElement = originalElement;
this.errorMessage = error.getMessage();
this.stackTrace = getStackTrace(error);
this.timestamp = timestamp;
this.messageId = UUID.randomUUID().toString();
}
private String getStackTrace(Throwable error) {
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
// Getters
public T getOriginalElement() { return originalElement; }
public String getErrorMessage() { return errorMessage; }
public String getStackTrace() { return stackTrace; }
public long getTimestamp() { return timestamp; }
public String getMessageId() { return messageId; }
}
}
// Checkpointing for pipeline state
@Component
@Slf4j
public class PipelineCheckpointer {
private final StateStore stateStore;
private final ScheduledExecutorService checkpointScheduler;
private final Map<String, Checkpointable> checkpointables;
public PipelineCheckpointer(StateStore stateStore) {
this.stateStore = stateStore;
this.checkpointScheduler = Executors.newSingleThreadScheduledExecutor();
this.checkpointables = new ConcurrentHashMap<>();
}
public void registerCheckpointable(String name, Checkpointable checkpointable) {
checkpointables.put(name, checkpointable);
log.info("Registered checkpointable: {}", name);
}
public void startCheckpointing(Duration interval) {
checkpointScheduler.scheduleAtFixedRate(this::performCheckpoint,
interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
log.info("Started checkpointing with interval: {}", interval);
}
public void stopCheckpointing() {
checkpointScheduler.shutdown();
try {
if (!checkpointScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
checkpointScheduler.shutdownNow();
}
} catch (InterruptedException e) {
checkpointScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("Stopped checkpointing");
}
public void performCheckpoint() {
checkpointables.forEach((name, checkpointable) -> {
try {
checkpointable.checkpoint();
log.debug("Performed checkpoint for: {}", name);
} catch (Exception e) {
log.error("Failed to perform checkpoint for: {}", name, e);
}
});
}
public void restoreFromCheckpoint() {
checkpointables.forEach((name, checkpointable) -> {
try {
checkpointable.restore();
log.debug("Restored from checkpoint for: {}", name);
} catch (Exception e) {
log.error("Failed to restore from checkpoint for: {}", name, e);
}
});
}
public interface Checkpointable {
void checkpoint();
void restore();
}
}
Monitoring & Metrics
8. Comprehensive Monitoring
// Metrics service for pipeline monitoring
@Component
@Slf4j
public class PipelineMetrics {
private final MeterRegistry meterRegistry;
private final Map<String, Counter> processedCounters;
private final Map<String, Counter> failedCounters;
private final Map<String, Timer> processingTimers;
private final Map<String, Gauge> queueSizeGauges;
public PipelineMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.processedCounters = new ConcurrentHashMap<>();
this.failedCounters = new ConcurrentHashMap<>();
this.processingTimers = new ConcurrentHashMap<>();
this.queueSizeGauges = new ConcurrentHashMap<>();
}
public void recordPipelineStart(String pipelineName) {
meterRegistry.counter("pipeline.starts", "pipeline", pipelineName).increment();
}
public void recordPipelineStop(String pipelineName) {
meterRegistry.counter("pipeline.stops", "pipeline", pipelineName).increment();
}
public void recordElementProcessed(String pipelineName, String elementId) {
getProcessedCounter(pipelineName).increment();
meterRegistry.counter("pipeline.elements.processed", "pipeline", pipelineName).increment();
}
public void recordElementFailed(String pipelineName, String elementId, Throwable error) {
getFailedCounter(pipelineName).increment();
meterRegistry.counter("pipeline.elements.failed", "pipeline", pipelineName).increment();
meterRegistry.counter("pipeline.errors", "pipeline", pipelineName, "error", error.getClass().getSimpleName()).increment();
}
public void recordBatchProcessed(String pipelineName, int batchSize) {
meterRegistry.counter("pipeline.batches.processed", "pipeline", pipelineName).increment();
meterRegistry.summary("pipeline.batch.size", "pipeline", pipelineName).record(batchSize);
}
public void recordBatchFailed(String pipelineName, int batchSize, Throwable error) {
meterRegistry.counter("pipeline.batches.failed", "pipeline", pipelineName).increment();
}
public void recordWindowElementProcessed(String windowName) {
meterRegistry.counter("window.elements.processed", "window", windowName).increment();
}
public Timer.Sample startProcessingTimer(String pipelineName) {
return Timer.start(meterRegistry);
}
public void stopProcessingTimer(Timer.Sample sample, String pipelineName) {
sample.stop(getProcessingTimer(pipelineName));
}
public void recordQueueSize(String queueName, int size) {
Gauge gauge = queueSizeGauges.computeIfAbsent(queueName,
name -> Gauge.builder("pipeline.queue.size")
.tag("queue", name)
.register(meterRegistry));
// Note: Gauge registration is handled by Micrometer, this is conceptual
}
public double getProcessedCount(String pipelineName) {
Counter counter = processedCounters.get(pipelineName);
return counter != null ? counter.count() : 0.0;
}
public double getFailedCount(String pipelineName) {
Counter counter = failedCounters.get(pipelineName);
return counter != null ? counter.count() : 0.0;
}
public double getProcessingRate(String pipelineName) {
// This would need more sophisticated rate calculation
return getProcessedCount(pipelineName) / (System.currentTimeMillis() / 1000.0);
}
private Counter getProcessedCounter(String pipelineName) {
return processedCounters.computeIfAbsent(pipelineName,
name -> meterRegistry.counter("pipeline.processed", "pipeline", name));
}
private Counter getFailedCounter(String pipelineName) {
return failedCounters.computeIfAbsent(pipelineName,
name -> meterRegistry.counter("pipeline.failed", "pipeline", name));
}
private Timer getProcessingTimer(String pipelineName) {
return processingTimers.computeIfAbsent(pipelineName,
name -> Timer.builder("pipeline.processing.time")
.tag("pipeline", name)
.register(meterRegistry));
}
}
// Health indicators for pipeline components
@Component
public class PipelineHealthIndicator implements HealthIndicator {
private final PipelineMetrics metrics;
private final Map<String, Pipeline<?>> pipelines;
public PipelineHealthIndicator(PipelineMetrics metrics) {
this.metrics = metrics;
this.pipelines = new ConcurrentHashMap<>();
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
}
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
Health.Builder status = Health.up();
for (Map.Entry<String, Pipeline<?>> entry : pipelines.entrySet()) {
String pipelineName = entry.getKey();
Pipeline<?> pipeline = entry.getValue();
PipelineStats stats = pipeline.getStats();
Map<String, Object> pipelineDetails = new HashMap<>();
pipelineDetails.put("processed", stats.processedCount());
pipelineDetails.put("failed", stats.failedCount());
pipelineDetails.put("rate", stats.processingRate());
// Check for stalled pipeline
if (stats.processingRate() < 1.0) { // Less than 1 element per second
pipelineDetails.put("status", "SLOW");
status = Health.down();
} else {
pipelineDetails.put("status", "HEALTHY");
}
details.put(pipelineName, pipelineDetails);
}
return status.withDetails(details).build();
}
}
// Pipeline dashboard service
@Component
@Slf4j
public class PipelineDashboard {
private final PipelineMetrics metrics;
private final Map<String, Pipeline<?>> pipelines;
private final ScheduledExecutorService dashboardScheduler;
public PipelineDashboard(PipelineMetrics metrics) {
this.metrics = metrics;
this.pipelines = new ConcurrentHashMap<>();
this.dashboardScheduler = Executors.newSingleThreadScheduledExecutor();
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
}
public void startDashboardReporting(Duration interval) {
dashboardScheduler.scheduleAtFixedRate(this::reportPipelineStats,
interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
}
public void stopDashboardReporting() {
dashboardScheduler.shutdown();
}
private void reportPipelineStats() {
log.info("=== Pipeline Dashboard Report ===");
for (Map.Entry<String, Pipeline<?>> entry : pipelines.entrySet()) {
String pipelineName = entry.getKey();
Pipeline<?> pipeline = entry.getValue();
PipelineStats stats = pipeline.getStats();
log.info("Pipeline: {}", pipelineName);
log.info(" Processed: {}, Failed: {}, Rate: {}/s",
stats.processedCount(), stats.failedCount(), stats.processingRate());
if (stats.sourceStats() != null) {
log.info(" Source: {} (emitted: {}, errors: {})",
stats.sourceStats().sourceName(),
stats.sourceStats().elementsEmitted(),
stats.sourceStats().errors());
}
if (stats.sinkStats() != null) {
log.info(" Sink: {} (written: {}, errors: {})",
stats.sinkStats().sinkName(),
stats.sinkStats().elementsWritten(),
stats.sinkStats().errors());
}
}
log.info("=================================");
}
public Map<String, Object> getPipelineMetrics(String pipelineName) {
Pipeline<?> pipeline = pipelines.get(pipelineName);
if (pipeline == null) {
return Map.of("error", "Pipeline not found: " + pipelineName);
}
PipelineStats stats = pipeline.getStats();
return Map.of(
"name", pipelineName,
"processed", stats.processedCount(),
"failed", stats.failedCount(),
"rate", stats.processingRate(),
"source", stats.sourceStats() != null ? Map.of(
"name", stats.sourceStats().sourceName(),
"emitted", stats.sourceStats().elementsEmitted(),
"errors", stats.sourceStats().errors()
) : null,
"sink", stats.sinkStats() != null ? Map.of(
"name", stats.sinkStats().sinkName(),
"written", stats.sinkStats().elementsWritten(),
"errors", stats.sinkStats().errors()
) : null
);
}
}
Spring Boot Integration
9. Complete Spring Boot Configuration
# application.yml spring: application: name: dataflow-pipeline kafka: bootstrap-servers: localhost:9092 consumer: group-id: pipeline-consumer auto-offset-reset: earliest producer: acks: all retries: 3 datasource: url: jdbc:postgresql://localhost:5432/pipeline username: pipeline password: pipeline hikari: maximum-pool-size: 10 jackson: property-naming-strategy: SNAKE_CASE default-property-inclusion: NON_NULL # Pipeline Configuration pipeline: batch: size: 1000 flush-interval: 5s window: size: 1m slide: 30s state: store: file directory: /tmp/pipeline-state retry: max-attempts: 3 backoff-initial-interval: 1s circuit-breaker: failure-threshold: 50 wait-duration: 30s # Pipeline definitions pipelines: user-activity: source: type: kafka topic: user-events transforms: - type: filter condition: "eventType == 'CLICK'" - type: map field: "userId" - type: window strategy: session timeout: 30m sink: type: kafka topic: user-sessions order-processing: source: type: jdbc query: "SELECT * FROM orders WHERE status = 'PENDING'" poll-interval: 10s transforms: - type: enrich service: customer-service field: "customerId" - type: batch size: 100 sink: type: jdbc table: processed_orders # Management management: endpoints: web: exposure: include: health,metrics,info,prometheus endpoint: health: show-details: always metrics: export: prometheus: enabled: true distribution: percentiles-histogram: "[pipeline.processing.time]": true
10. Spring Configuration and Main Application
@Configuration
@EnableConfigurationProperties(PipelineProperties.class)
public class PipelineConfiguration {
@Bean
public PipelineMetrics pipelineMetrics(MeterRegistry meterRegistry) {
return new PipelineMetrics(meterRegistry);
}
@Bean
public PipelineStateManager pipelineStateManager(StateStore stateStore) {
return new PipelineStateManager(stateStore);
}
@Bean
@ConditionalOnProperty(name = "pipeline.state.store", havingValue = "file")
public StateStore fileStateStore(PipelineProperties properties, ObjectMapper objectMapper) throws IOException {
return new FileStateStore(properties.getState().getDirectory(), objectMapper);
}
@Bean
@ConditionalOnMissingBean(StateStore.class)
public StateStore inMemoryStateStore(ObjectMapper objectMapper) {
return new InMemoryStateStore(objectMapper);
}
@Bean
public PipelineCircuitBreaker pipelineCircuitBreaker() {
return new PipelineCircuitBreaker();
}
@Bean
public PipelineRetryManager pipelineRetryManager() {
return new PipelineRetryManager();
}
@Bean
public PipelineCheckpointer pipelineCheckpointer(StateStore stateStore) {
return new PipelineCheckpointer(stateStore);
}
@Bean
public PipelineHealthIndicator pipelineHealthIndicator(PipelineMetrics metrics) {
return new PipelineHealthIndicator(metrics);
}
@Bean
public PipelineDashboard pipelineDashboard(PipelineMetrics metrics) {
return new PipelineDashboard(metrics);
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
@ConfigurationProperties(prefix = "pipeline")
@Data
public class PipelineProperties {
private Batch batch = new Batch();
private Window window = new Window();
private State state = new State();
private Retry retry = new Retry();
private CircuitBreaker circuitBreaker = new CircuitBreaker();
private Map<String, PipelineConfig> pipelines = new HashMap<>();
@Data
public static class Batch {
private int size = 1000;
private Duration flushInterval = Duration.ofSeconds(5);
}
@Data
public static class Window {
private Duration size = Duration.ofMinutes(1);
private Duration slide = Duration.ofSeconds(30);
}
@Data
public static class State {
private String store = "memory";
private String directory = "/tmp/pipeline-state";
}
@Data
public static class Retry {
private int maxAttempts = 3;
private Duration backoffInitialInterval = Duration.ofSeconds(1);
}
@Data
public static class CircuitBreaker {
private int failureThreshold = 50;
private Duration waitDuration = Duration.ofSeconds(30);
}
@Data
public static class PipelineConfig {
private SourceConfig source;
private List<TransformConfig> transforms = new ArrayList<>();
private SinkConfig sink;
}
@Data
public static class SourceConfig {
private String type;
private String topic;
private String query;
private Duration pollInterval;
}
@Data
public static class TransformConfig {
private String type;
private String condition;
private String field;
private String service;
}
@Data
public static class SinkConfig {
private String type;
private String topic;
private String table;
}
}
// Main application with pipeline orchestration
@SpringBootApplication
@Slf4j
public class DataflowPipelineApplication {
public static void main(String[] args) {
SpringApplication.run(DataflowPipelineApplication.class, args);
}
@Bean
public CommandLineRunner pipelineOrchestrator(PipelineProperties properties,
PipelineMetrics metrics,
KafkaTemplate<String, String> kafkaTemplate,
DataSource dataSource,
ObjectMapper objectMapper) {
return args -> {
log.info("Initializing dataflow pipelines...");
for (Map.Entry<String, PipelineProperties.PipelineConfig> entry : properties.getPipelines().entrySet()) {
String pipelineName = entry.getKey();
PipelineProperties.PipelineConfig config = entry.getValue();
createPipeline(pipelineName, config, metrics, kafkaTemplate, dataSource, objectMapper);
}
log.info("All pipelines initialized successfully");
};
}
private void createPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, KafkaTemplate<String, String> kafkaTemplate,
DataSource dataSource, ObjectMapper objectMapper) {
// Implementation would create pipeline based on configuration
log.info("Creating pipeline: {} with config: {}", name, config);
// Example pipeline creation logic
if ("user-activity".equals(name)) {
createUserActivityPipeline(name, config, metrics, kafkaTemplate, objectMapper);
} else if ("order-processing".equals(name)) {
createOrderProcessingPipeline(name, config, metrics, dataSource, objectMapper);
}
}
private void createUserActivityPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper) {
// Create user activity pipeline
Pipeline<String> pipeline = new DataflowPipeline<>(name, metrics);
// Configure source
KafkaSource<String> source = new KafkaSource<>(kafkaTemplate, objectMapper, String.class,
config.getSource().getTopic(), "user-activity-group");
// Configure transforms
pipeline.from(source)
.filter(new FilterTransform<>("click-filter",
event -> event.contains("\"eventType\":\"CLICK\"")))
.transform(new MapTransform<>("extract-userId",
event -> extractUserId(event)))
.window(new SessionWindow<>(
this::extractSessionId,
event -> System.currentTimeMillis(),
Duration.ofMinutes(30),
Duration.ofSeconds(30)
));
// Configure sink
KafkaSink<String> sink = new KafkaSink<>(kafkaTemplate, config.getSink().getTopic(), objectMapper);
pipeline.to(sink);
pipeline.start();
log.info("Started user activity pipeline: {}", name);
}
private void createOrderProcessingPipeline(String name, PipelineProperties.PipelineConfig config,
PipelineMetrics metrics, DataSource dataSource,
ObjectMapper objectMapper) {
// Create order processing pipeline
BatchPipeline<String> pipeline = new BatchPipeline<>(name, metrics,
properties.getBatch().getSize(), properties.getBatch().getFlushInterval());
// Configure source
JdbcSource<String> source = new JdbcSource<>(dataSource, config.getSource().getQuery(),
rs -> rs.getString("order_data"), config.getSource().getPollInterval().toMillis());
// Configure pipeline
pipeline.from(source)
.to(new JdbcSink<>(dataSource, config.getSink().getTable()));
pipeline.start();
log.info("Started order processing pipeline: {}", name);
}
private String extractUserId(String event) {
// Simplified implementation
try {
JsonNode node = objectMapper.readTree(event);
return node.get("userId").asText();
} catch (Exception e) {
return "unknown";
}
}
private String extractSessionId(String event) {
// Simplified implementation
try {
JsonNode node = objectMapper.readTree(event);
return node.get("sessionId").asText();
} catch (Exception e) {
return "unknown-session";
}
}
}
// REST controller for pipeline management
@RestController
@RequestMapping("/api/pipelines")
@Slf4j
public class PipelineController {
private final PipelineDashboard dashboard;
private final Map<String, Pipeline<?>> pipelines;
public PipelineController(PipelineDashboard dashboard) {
this.dashboard = dashboard;
this.pipelines = new ConcurrentHashMap<>();
}
@PostMapping("/{name}/start")
public ResponseEntity<String> startPipeline(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
pipeline.start();
return ResponseEntity.ok("Pipeline " + name + " started");
}
@PostMapping("/{name}/stop")
public ResponseEntity<String> stopPipeline(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
pipeline.stop();
return ResponseEntity.ok("Pipeline " + name + " stopped");
}
@GetMapping("/{name}/metrics")
public ResponseEntity<Map<String, Object>> getPipelineMetrics(@PathVariable String name) {
Map<String, Object> metrics = dashboard.getPipelineMetrics(name);
if (metrics.containsKey("error")) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(metrics);
}
@GetMapping("/{name}/stats")
public ResponseEntity<PipelineStats> getPipelineStats(@PathVariable String name) {
Pipeline<?> pipeline = pipelines.get(name);
if (pipeline == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(pipeline.getStats());
}
public void registerPipeline(String name, Pipeline<?> pipeline) {
pipelines.put(name, pipeline);
dashboard.registerPipeline(name, pipeline);
}
}
This comprehensive dataflow pipeline implementation provides:
- Modular pipeline architecture with sources, transforms, and sinks
- Both stream and batch processing capabilities
- Advanced windowing strategies for time-based operations
- Robust state management with persistence
- Comprehensive fault tolerance with circuit breakers and retries
- Detailed monitoring and metrics for observability
- Spring Boot integration for easy deployment and configuration
The pipeline is designed to be extensible, scalable, and production-ready for various data processing scenarios.