The Java Flow API, introduced in Java 9, provides an implementation of the Reactive Streams specification for building asynchronous, non-blocking, and backpressure-enabled data processing pipelines. It's the standard Java way to implement the publish-subscribe pattern with flow control.
Understanding the Flow API Components
The Flow API consists of four main interfaces:
- Publisher - Produces items for subscribers
- Subscriber - Consumes items from publishers
- Subscription - Manages the flow between publisher and subscriber
- Processor - Acts as both subscriber and publisher (intermediate step)
Basic Flow API Interfaces
import java.util.concurrent.Flow;
public class FlowInterfacesDemo {
// The four core interfaces
Flow.Publisher<T> // Produces data
Flow.Subscriber<T> // Consumes data
Flow.Subscription // Controls flow
Flow.Processor<T,R> // Transforms data
}
Simple Implementation Examples
Example 1: Basic Publisher and Subscriber
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
public class BasicFlowDemo {
static class SimpleSubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
private final String name;
public SimpleSubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println(name + ": Subscribed");
// Request first item
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println(name + ": Received: " + item);
// Process item and request next one
try {
Thread.sleep(100); // Simulate processing time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println(name + ": Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + ": Completed");
}
}
public static void main(String[] args) throws InterruptedException {
// Create a publisher (using built-in SubmissionPublisher)
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// Create subscribers
SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>("Subscriber-1");
SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>("Subscriber-2");
// Subscribe to publisher
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
// Publish items
for (int i = 1; i <= 5; i++) {
String item = "Item-" + i;
System.out.println("Publishing: " + item);
publisher.submit(item);
}
// Wait for processing to complete
Thread.sleep(2000);
}
}
}
Example 2: Custom Publisher with Backpressure Control
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
public class CustomPublisherDemo {
static class NumberPublisher implements Flow.Publisher<Long> {
private final long maxNumbers;
public NumberPublisher(long maxNumbers) {
this.maxNumbers = maxNumbers;
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
// Create subscription and pass to subscriber
NumberSubscription subscription = new NumberSubscription(subscriber, maxNumbers);
subscriber.onSubscribe(subscription);
}
}
static class NumberSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super Long> subscriber;
private final long maxNumbers;
private final AtomicLong currentNumber;
private volatile boolean cancelled;
public NumberSubscription(Flow.Subscriber<? super Long> subscriber, long maxNumbers) {
this.subscriber = subscriber;
this.maxNumbers = maxNumbers;
this.currentNumber = new AtomicLong(1);
this.cancelled = false;
}
@Override
public void request(long n) {
if (cancelled) return;
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Request must be positive"));
return;
}
long produced = 0;
while (produced < n && currentNumber.get() <= maxNumbers && !cancelled) {
long number = currentNumber.getAndIncrement();
subscriber.onNext(number);
produced++;
}
if (currentNumber.get() > maxNumbers && !cancelled) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
cancelled = true;
System.out.println("Subscription cancelled");
}
}
static class SmartSubscriber implements Flow.Subscriber<Long> {
private final String name;
private final long burstSize;
private Flow.Subscription subscription;
private long counter;
public SmartSubscriber(String name, long burstSize) {
this.name = name;
this.burstSize = burstSize;
this.counter = 0;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println(name + ": Subscribed, requesting " + burstSize + " items");
subscription.request(burstSize);
}
@Override
public void onNext(Long item) {
System.out.println(name + ": Processing item " + item);
counter++;
// Simulate processing
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
subscription.cancel();
return;
}
// Request more when we're halfway through our burst
if (counter >= burstSize / 2) {
System.out.println(name + ": Requesting next burst of " + burstSize);
subscription.request(burstSize);
counter = 0;
}
}
@Override
public void onError(Throwable throwable) {
System.err.println(name + ": Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + ": Stream completed successfully");
}
}
public static void main(String[] args) throws InterruptedException {
NumberPublisher publisher = new NumberPublisher(20);
SmartSubscriber subscriber = new SmartSubscriber("SmartSub", 5);
publisher.subscribe(subscriber);
Thread.sleep(2000);
}
}
Advanced Flow Patterns
Example 3: Transform Processor
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class ProcessorDemo {
static class TransformProcessor<T, R> extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private final Function<T, R> transform;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> transform) {
this.transform = transform;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
// Transform and publish
R transformed = transform.apply(item);
submit(transformed);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
static class StringToLengthProcessor extends TransformProcessor<String, Integer> {
public StringToLengthProcessor() {
super(String::length);
}
}
public static void main(String[] args) throws InterruptedException {
try (SubmissionPublisher<String> stringPublisher = new SubmissionPublisher<>();
StringToLengthProcessor processor = new StringToLengthProcessor()) {
// Create final subscriber
Flow.Subscriber<Integer> lengthSubscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer item) {
System.out.println("String length: " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Length processing complete");
}
};
// Connect the pipeline
stringPublisher.subscribe(processor);
processor.subscribe(lengthSubscriber);
// Publish strings
stringPublisher.submit("Hello");
stringPublisher.submit("Reactive");
stringPublisher.submit("Streams");
stringPublisher.submit("Java");
Thread.sleep(1000);
}
}
}
Example 4: Complex Processing Pipeline
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
public class ProcessingPipelineDemo {
static class FilterProcessor<T> extends SubmissionPublisher<T>
implements Flow.Processor<T, T> {
private final Function<T, Boolean> filter;
private Flow.Subscription subscription;
public FilterProcessor(Function<T, Boolean> filter) {
this.filter = filter;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
if (filter.apply(item)) {
submit(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
public static void main(String[] args) throws InterruptedException {
// Create custom executor for parallelism
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> numberPublisher =
new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
FilterProcessor<Integer> evenFilter =
new FilterProcessor<>(n -> n % 2 == 0);
TransformProcessor<Integer, String> stringConverter =
new TransformProcessor<>(n -> "Number: " + n)) {
// Create monitoring subscriber
Flow.Subscriber<String> monitor = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Monitor subscribed, requesting items...");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Monitor: " + item + " on thread: " +
Thread.currentThread().getName());
try {
Thread.sleep(200); // Simulate processing
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Monitor error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Monitor: Processing complete");
}
};
// Build the pipeline
numberPublisher.subscribe(evenFilter);
evenFilter.subscribe(stringConverter);
stringConverter.subscribe(monitor);
// Generate numbers
for (int i = 1; i <= 10; i++) {
System.out.println("Publishing: " + i);
numberPublisher.submit(i);
Thread.sleep(50);
}
Thread.sleep(2000);
} finally {
executor.shutdown();
}
}
}
Error Handling and Resource Management
Example 5: Robust Flow Implementation
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
public class RobustFlowDemo {
static class SafeSubscriber<T> implements Flow.Subscriber<T> {
private final String name;
private Flow.Subscription subscription;
private final AtomicBoolean active;
public SafeSubscriber(String name) {
this.name = name;
this.active = new AtomicBoolean(true);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (!active.get()) {
subscription.cancel();
return;
}
this.subscription = subscription;
System.out.println(name + ": Subscribed safely");
subscription.request(1);
}
@Override
public void onNext(T item) {
if (!active.get()) {
subscription.cancel();
return;
}
try {
System.out.println(name + ": Processing " + item);
// Simulate potential error
if (item.equals("error")) {
throw new RuntimeException("Simulated processing error");
}
Thread.sleep(100);
subscription.request(1);
} catch (Exception e) {
System.err.println(name + ": Error processing " + item + " - " + e.getMessage());
subscription.cancel();
active.set(false);
}
}
@Override
public void onError(Throwable throwable) {
System.err.println(name + ": Received error - " + throwable.getMessage());
active.set(false);
}
@Override
public void onComplete() {
System.out.println(name + ": Completed successfully");
active.set(false);
}
public void cancel() {
if (active.getAndSet(false) && subscription != null) {
subscription.cancel();
}
}
}
public static void main(String[] args) throws InterruptedException {
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
SafeSubscriber<String> subscriber1 = new SafeSubscriber<>("SafeSub-1");
SafeSubscriber<String> subscriber2 = new SafeSubscriber<>("SafeSub-2");
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
// Publish various items
publisher.submit("normal");
publisher.submit("error"); // This will cause an error
publisher.submit("recovery"); // This might not be processed
Thread.sleep(1000);
// Manual cancellation
subscriber2.cancel();
System.out.println("Subscriber 2 manually cancelled");
publisher.submit("after_cancel");
Thread.sleep(500);
}
}
}
Key Concepts and Best Practices
Backpressure Strategies
- Pull-based: Subscriber controls the flow by requesting items
- Batching: Request multiple items at once for efficiency
- Dynamic Request: Adjust request size based on processing capability
Best Practices
- Always Implement Backpressure: Never let publishers overwhelm subscribers
- Proper Resource Cleanup: Use try-with-resources for publishers
- Error Handling: Implement robust error handling in subscribers
- Thread Safety: Design for concurrent access
- Monitoring: Track throughput and backlog
// Good practice: Using try-with-resources
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(subscriber);
publisher.submit("data");
// Publisher automatically closed
}
// Good practice: Proper backpressure control
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Start with small requests
}
Real-World Use Cases
- Data Streaming: Processing continuous data streams
- Event Processing: Handling application events asynchronously
- Message Brokers: Building custom message processing systems
- Data Transformation: Creating ETL pipelines
- Reactive Applications: Building responsive, resilient systems
Conclusion
The Java Flow API provides a standardized way to implement reactive streams with built-in backpressure control. Key takeaways:
- Built-in Backpressure: Prevents overwhelming subscribers
- Composable Pipelines: Processors enable complex data transformations
- Thread-Safe: Designed for concurrent environments
- Standardized: Part of the Java standard library since Java 9
- Flexible: Can handle various data processing patterns
While the Flow API requires more boilerplate than some third-party reactive libraries, it provides a solid foundation for building robust, backpressure-aware data processing systems in pure Java. For more complex scenarios, consider integrating with reactive frameworks like Project Reactor or RxJava that build upon these same principles.