The Flow API, introduced in Java 9, provides a standard for implementing reactive streams in Java. It enables asynchronous, non-blocking data processing with backpressure control, making it ideal for building responsive and resilient applications.
What is the Flow API?
The Flow API defines four key interfaces that form the reactive streams specification:
Publisher<T>- Produces items for subscribersSubscriber<T>- Consumes items from a publisherSubscription- Manages the flow control between publisher and subscriberProcessor<T,R>- Acts as both subscriber and publisher for transformation
Core Components
1. Publisher Interface
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
2. Subscriber Interface
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
3. Subscription Interface
public interface Subscription {
void request(long n);
void cancel();
}
Basic Implementation Examples
Example 1: Simple Publisher and Subscriber
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicInteger;
public class BasicFlowDemo {
public static void main(String[] args) throws InterruptedException {
// Create a publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Create a subscriber
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
private AtomicInteger counter = new AtomicInteger(1);
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscriber: Subscription received");
this.subscription = subscription;
// Request first item
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Subscriber: Received item " + counter.getAndIncrement() + " - " + item);
// Simulate processing time
try { Thread.sleep(100); } catch (InterruptedException e) {}
// Request next item
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscriber: All items processed!");
}
};
// Subscribe to publisher
publisher.subscribe(subscriber);
// Publish items
System.out.println("Publisher: Starting to publish items...");
for (int i = 1; i <= 5; i++) {
String item = "Message-" + i;
System.out.println("Publisher: Publishing - " + item);
publisher.submit(item);
}
// Close publisher (triggers onComplete)
Thread.sleep(1000);
publisher.close();
// Keep main thread alive to see output
Thread.sleep(500);
}
}
Output:
Publisher: Starting to publish items... Publisher: Publishing - Message-1 Publisher: Publishing - Message-2 Subscriber: Subscription received Publisher: Publishing - Message-3 Publisher: Publishing - Message-4 Publisher: Publishing - Message-5 Subscriber: Received item 1 - Message-1 Subscriber: Received item 2 - Message-2 Subscriber: Received item 3 - Message-3 Subscriber: Received item 4 - Message-4 Subscriber: Received item 5 - Message-5 Subscriber: All items processed!
Example 2: Custom Publisher with Backpressure
import java.util.concurrent.Flow.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
class NumberPublisher implements Publisher<Long> {
private final long maxNumbers;
public NumberPublisher(long maxNumbers) {
this.maxNumbers = maxNumbers;
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
// Create subscription and pass to subscriber
NumberSubscription subscription = new NumberSubscription(subscriber, maxNumbers);
subscriber.onSubscribe(subscription);
}
static class NumberSubscription implements Subscription {
private final Subscriber<? super Long> subscriber;
private final long maxNumbers;
private final AtomicLong currentNumber;
private volatile boolean cancelled;
public NumberSubscription(Subscriber<? super Long> subscriber, long maxNumbers) {
this.subscriber = subscriber;
this.maxNumbers = maxNumbers;
this.currentNumber = new AtomicLong(1);
this.cancelled = false;
}
@Override
public void request(long n) {
if (cancelled) return;
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Request must be positive"));
return;
}
long produced = 0;
while (produced < n && currentNumber.get() <= maxNumbers && !cancelled) {
long number = currentNumber.getAndIncrement();
subscriber.onNext(number);
produced++;
}
if (currentNumber.get() > maxNumbers && !cancelled) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
cancelled = true;
System.out.println("Subscription cancelled");
}
}
}
public class CustomPublisherDemo {
public static void main(String[] args) throws InterruptedException {
NumberPublisher publisher = new NumberPublisher(10);
Subscriber<Long> subscriber = new Subscriber<>() {
private Subscription subscription;
private AtomicLong requestCount = new AtomicLong(0);
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscriber: Subscribed to number publisher");
this.subscription = subscription;
// Request 3 items initially
subscription.request(3);
requestCount.addAndGet(3);
}
@Override
public void onNext(Long item) {
System.out.println("Subscriber: Processing number - " + item);
// Simulate work
try { Thread.sleep(200); } catch (InterruptedException e) {}
// Request more items when processing is done
if (item % 3 == 0) { // Request more every 3 items
subscription.request(2);
requestCount.addAndGet(2);
System.out.println("Subscriber: Requested 2 more items (total requested: " + requestCount + ")");
}
// Cancel after receiving number 7
if (item == 7) {
System.out.println("Subscriber: Cancelling subscription after 7");
subscription.cancel();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscriber: All numbers processed!");
}
};
publisher.subscribe(subscriber);
Thread.sleep(3000); // Allow time for processing
}
}
Output:
Subscriber: Subscribed to number publisher Subscriber: Processing number - 1 Subscriber: Processing number - 2 Subscriber: Processing number - 3 Subscriber: Requested 2 more items (total requested: 5) Subscriber: Processing number - 4 Subscriber: Processing number - 5 Subscriber: Processing number - 6 Subscriber: Requested 2 more items (total requested: 7) Subscriber: Processing number - 7 Subscriber: Cancelling subscription after 7 Subscription cancelled
Advanced Examples
Example 3: Transform Processor
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
class TransformProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
private final Function<T, R> transformFunction;
private Subscription subscription;
public TransformProcessor(Function<T, R> transformFunction) {
this.transformFunction = transformFunction;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
try {
R transformed = transformFunction.apply(item);
submit(transformed);
subscription.request(1);
} catch (Exception e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
public class ProcessorDemo {
public static void main(String[] args) throws InterruptedException {
// Create source publisher
SubmissionPublisher<String> sourcePublisher = new SubmissionPublisher<>();
// Create transform processor (String to Integer - string length)
TransformProcessor<String, Integer> lengthProcessor =
new TransformProcessor<>(String::length);
// Create final subscriber
Subscriber<Integer> finalSubscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Final Subscriber: Received length - " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Final Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Final Subscriber: Processing complete!");
}
};
// Chain the components
sourcePublisher.subscribe(lengthProcessor);
lengthProcessor.subscribe(finalSubscriber);
// Publish strings
String[] words = {"Hello", "Reactive", "Streams", "Java", "Programming"};
for (String word : words) {
System.out.println("Source: Publishing - " + word);
sourcePublisher.submit(word);
Thread.sleep(100);
}
// Close publishers
sourcePublisher.close();
Thread.sleep(500);
}
}
Output:
Source: Publishing - Hello Source: Publishing - Reactive Final Subscriber: Received length - 5 Source: Publishing - Streams Final Subscriber: Received length - 8 Source: Publishing - Java Final Subscriber: Received length - 7 Source: Publishing - Programming Final Subscriber: Received length - 4 Final Subscriber: Received length - 11 Final Subscriber: Processing complete!
Example 4: Error Handling and Completion
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class ErrorHandlingDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new Subscriber<>() {
private Subscription subscription;
private int processedCount = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscriber: Subscribed, requesting 5 items");
subscription.request(5);
}
@Override
public void onNext(Integer item) {
processedCount++;
System.out.println("Subscriber: Processing item " + item);
// Simulate error on specific condition
if (item == 7) {
System.out.println("Subscriber: Encountered error condition with number 7");
onError(new RuntimeException("Number 7 is not allowed!"));
return;
}
// Request more items after processing
if (processedCount % 3 == 0) {
System.out.println("Subscriber: Requesting 3 more items");
subscription.request(3);
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error occurred - " + throwable.getMessage());
System.out.println("Subscriber: Total items processed: " + processedCount);
}
@Override
public void onComplete() {
System.out.println("Subscriber: Successfully completed!");
System.out.println("Subscriber: Total items processed: " + processedCount);
}
};
publisher.subscribe(subscriber);
// Publish numbers
for (int i = 1; i <= 10; i++) {
System.out.println("Publisher: Submitting number " + i);
publisher.submit(i);
Thread.sleep(50);
}
Thread.sleep(1000);
publisher.close();
}
}
Output:
Publisher: Submitting number 1 Subscriber: Subscribed, requesting 5 items Subscriber: Processing item 1 Publisher: Submitting number 2 Subscriber: Processing item 2 Publisher: Submitting number 3 Subscriber: Processing item 3 Subscriber: Requesting 3 more items Publisher: Submitting number 4 Subscriber: Processing item 4 Publisher: Submitting number 5 Subscriber: Processing item 5 Publisher: Submitting number 6 Subscriber: Processing item 6 Subscriber: Requesting 3 more items Publisher: Submitting number 7 Subscriber: Processing item 7 Subscriber: Encountered error condition with number 7 Subscriber: Error occurred - Number 7 is not allowed! Subscriber: Total items processed: 7 Publisher: Submitting number 8 Publisher: Submitting number 9 Publisher: Submitting number 10
Key Concepts and Best Practices
1. Backpressure Control
- Subscribers control the flow rate using
request(n) - Prevents overwhelming subscribers with data
- Enables efficient resource utilization
2. Lifecycle Management
- Always implement all four Subscriber methods
- Properly handle
onComplete()andonError() - Cancel subscriptions when no longer needed
3. Error Handling
- Use
onError()for graceful error propagation - Consider retry mechanisms for transient errors
- Log errors appropriately
4. Resource Cleanup
// Proper cleanup pattern
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(subscriber);
// ... publish items
} // Auto-close called
Real-World Use Cases
- Data Streaming - Process real-time data feeds
- Microservices Communication - Async service-to-service communication
- File Processing - Stream large files with backpressure
- WebSocket Messages - Handle real-time web messages
- Database Query Results - Stream large result sets
Conclusion
The Flow API provides a standardized way to implement reactive streams in Java:
Publisheremits data streamsSubscriberconsumes data with backpressure controlSubscriptionmanages the data flow between themProcessorenables stream transformation
Key Benefits:
- Asynchronous Processing - Non-blocking data flow
- Backpressure Management - Prevents resource exhaustion
- Composability - Easy to chain and transform streams
- Standardization - Interoperable with other reactive libraries
The Flow API is particularly valuable for building responsive, resilient, and elastic applications that need to handle streams of data efficiently.
Note: While SubmissionPublisher provides a convenient implementation, for production systems consider using established reactive libraries like Project Reactor or RxJava that build upon these concepts with additional features and optimizations.