Flow API: Publisher and Subscriber in Java

The Flow API, introduced in Java 9, provides a standard for implementing reactive streams in Java. It enables asynchronous, non-blocking data processing with backpressure control, making it ideal for building responsive and resilient applications.


What is the Flow API?

The Flow API defines four key interfaces that form the reactive streams specification:

  1. Publisher<T> - Produces items for subscribers
  2. Subscriber<T> - Consumes items from a publisher
  3. Subscription - Manages the flow control between publisher and subscriber
  4. Processor<T,R> - Acts as both subscriber and publisher for transformation

Core Components

1. Publisher Interface

@FunctionalInterface
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}

2. Subscriber Interface

public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}

3. Subscription Interface

public interface Subscription {
void request(long n);
void cancel();
}

Basic Implementation Examples

Example 1: Simple Publisher and Subscriber

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicInteger;
public class BasicFlowDemo {
public static void main(String[] args) throws InterruptedException {
// Create a publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Create a subscriber
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
private AtomicInteger counter = new AtomicInteger(1);
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscriber: Subscription received");
this.subscription = subscription;
// Request first item
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Subscriber: Received item " + counter.getAndIncrement() + " - " + item);
// Simulate processing time
try { Thread.sleep(100); } catch (InterruptedException e) {}
// Request next item
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscriber: All items processed!");
}
};
// Subscribe to publisher
publisher.subscribe(subscriber);
// Publish items
System.out.println("Publisher: Starting to publish items...");
for (int i = 1; i <= 5; i++) {
String item = "Message-" + i;
System.out.println("Publisher: Publishing - " + item);
publisher.submit(item);
}
// Close publisher (triggers onComplete)
Thread.sleep(1000);
publisher.close();
// Keep main thread alive to see output
Thread.sleep(500);
}
}

Output:

Publisher: Starting to publish items...
Publisher: Publishing - Message-1
Publisher: Publishing - Message-2
Subscriber: Subscription received
Publisher: Publishing - Message-3
Publisher: Publishing - Message-4
Publisher: Publishing - Message-5
Subscriber: Received item 1 - Message-1
Subscriber: Received item 2 - Message-2
Subscriber: Received item 3 - Message-3
Subscriber: Received item 4 - Message-4
Subscriber: Received item 5 - Message-5
Subscriber: All items processed!

Example 2: Custom Publisher with Backpressure

import java.util.concurrent.Flow.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
class NumberPublisher implements Publisher<Long> {
private final long maxNumbers;
public NumberPublisher(long maxNumbers) {
this.maxNumbers = maxNumbers;
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
// Create subscription and pass to subscriber
NumberSubscription subscription = new NumberSubscription(subscriber, maxNumbers);
subscriber.onSubscribe(subscription);
}
static class NumberSubscription implements Subscription {
private final Subscriber<? super Long> subscriber;
private final long maxNumbers;
private final AtomicLong currentNumber;
private volatile boolean cancelled;
public NumberSubscription(Subscriber<? super Long> subscriber, long maxNumbers) {
this.subscriber = subscriber;
this.maxNumbers = maxNumbers;
this.currentNumber = new AtomicLong(1);
this.cancelled = false;
}
@Override
public void request(long n) {
if (cancelled) return;
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Request must be positive"));
return;
}
long produced = 0;
while (produced < n && currentNumber.get() <= maxNumbers && !cancelled) {
long number = currentNumber.getAndIncrement();
subscriber.onNext(number);
produced++;
}
if (currentNumber.get() > maxNumbers && !cancelled) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
cancelled = true;
System.out.println("Subscription cancelled");
}
}
}
public class CustomPublisherDemo {
public static void main(String[] args) throws InterruptedException {
NumberPublisher publisher = new NumberPublisher(10);
Subscriber<Long> subscriber = new Subscriber<>() {
private Subscription subscription;
private AtomicLong requestCount = new AtomicLong(0);
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscriber: Subscribed to number publisher");
this.subscription = subscription;
// Request 3 items initially
subscription.request(3);
requestCount.addAndGet(3);
}
@Override
public void onNext(Long item) {
System.out.println("Subscriber: Processing number - " + item);
// Simulate work
try { Thread.sleep(200); } catch (InterruptedException e) {}
// Request more items when processing is done
if (item % 3 == 0) { // Request more every 3 items
subscription.request(2);
requestCount.addAndGet(2);
System.out.println("Subscriber: Requested 2 more items (total requested: " + requestCount + ")");
}
// Cancel after receiving number 7
if (item == 7) {
System.out.println("Subscriber: Cancelling subscription after 7");
subscription.cancel();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscriber: All numbers processed!");
}
};
publisher.subscribe(subscriber);
Thread.sleep(3000); // Allow time for processing
}
}

Output:

Subscriber: Subscribed to number publisher
Subscriber: Processing number - 1
Subscriber: Processing number - 2
Subscriber: Processing number - 3
Subscriber: Requested 2 more items (total requested: 5)
Subscriber: Processing number - 4
Subscriber: Processing number - 5
Subscriber: Processing number - 6
Subscriber: Requested 2 more items (total requested: 7)
Subscriber: Processing number - 7
Subscriber: Cancelling subscription after 7
Subscription cancelled

Advanced Examples

Example 3: Transform Processor

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
class TransformProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
private final Function<T, R> transformFunction;
private Subscription subscription;
public TransformProcessor(Function<T, R> transformFunction) {
this.transformFunction = transformFunction;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
try {
R transformed = transformFunction.apply(item);
submit(transformed);
subscription.request(1);
} catch (Exception e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
public class ProcessorDemo {
public static void main(String[] args) throws InterruptedException {
// Create source publisher
SubmissionPublisher<String> sourcePublisher = new SubmissionPublisher<>();
// Create transform processor (String to Integer - string length)
TransformProcessor<String, Integer> lengthProcessor = 
new TransformProcessor<>(String::length);
// Create final subscriber
Subscriber<Integer> finalSubscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Final Subscriber: Received length - " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Final Subscriber: Error - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Final Subscriber: Processing complete!");
}
};
// Chain the components
sourcePublisher.subscribe(lengthProcessor);
lengthProcessor.subscribe(finalSubscriber);
// Publish strings
String[] words = {"Hello", "Reactive", "Streams", "Java", "Programming"};
for (String word : words) {
System.out.println("Source: Publishing - " + word);
sourcePublisher.submit(word);
Thread.sleep(100);
}
// Close publishers
sourcePublisher.close();
Thread.sleep(500);
}
}

Output:

Source: Publishing - Hello
Source: Publishing - Reactive
Final Subscriber: Received length - 5
Source: Publishing - Streams
Final Subscriber: Received length - 8
Source: Publishing - Java
Final Subscriber: Received length - 7
Source: Publishing - Programming
Final Subscriber: Received length - 4
Final Subscriber: Received length - 11
Final Subscriber: Processing complete!

Example 4: Error Handling and Completion

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class ErrorHandlingDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new Subscriber<>() {
private Subscription subscription;
private int processedCount = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscriber: Subscribed, requesting 5 items");
subscription.request(5);
}
@Override
public void onNext(Integer item) {
processedCount++;
System.out.println("Subscriber: Processing item " + item);
// Simulate error on specific condition
if (item == 7) {
System.out.println("Subscriber: Encountered error condition with number 7");
onError(new RuntimeException("Number 7 is not allowed!"));
return;
}
// Request more items after processing
if (processedCount % 3 == 0) {
System.out.println("Subscriber: Requesting 3 more items");
subscription.request(3);
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Subscriber: Error occurred - " + throwable.getMessage());
System.out.println("Subscriber: Total items processed: " + processedCount);
}
@Override
public void onComplete() {
System.out.println("Subscriber: Successfully completed!");
System.out.println("Subscriber: Total items processed: " + processedCount);
}
};
publisher.subscribe(subscriber);
// Publish numbers
for (int i = 1; i <= 10; i++) {
System.out.println("Publisher: Submitting number " + i);
publisher.submit(i);
Thread.sleep(50);
}
Thread.sleep(1000);
publisher.close();
}
}

Output:

Publisher: Submitting number 1
Subscriber: Subscribed, requesting 5 items
Subscriber: Processing item 1
Publisher: Submitting number 2
Subscriber: Processing item 2
Publisher: Submitting number 3
Subscriber: Processing item 3
Subscriber: Requesting 3 more items
Publisher: Submitting number 4
Subscriber: Processing item 4
Publisher: Submitting number 5
Subscriber: Processing item 5
Publisher: Submitting number 6
Subscriber: Processing item 6
Subscriber: Requesting 3 more items
Publisher: Submitting number 7
Subscriber: Processing item 7
Subscriber: Encountered error condition with number 7
Subscriber: Error occurred - Number 7 is not allowed!
Subscriber: Total items processed: 7
Publisher: Submitting number 8
Publisher: Submitting number 9
Publisher: Submitting number 10

Key Concepts and Best Practices

1. Backpressure Control

  • Subscribers control the flow rate using request(n)
  • Prevents overwhelming subscribers with data
  • Enables efficient resource utilization

2. Lifecycle Management

  • Always implement all four Subscriber methods
  • Properly handle onComplete() and onError()
  • Cancel subscriptions when no longer needed

3. Error Handling

  • Use onError() for graceful error propagation
  • Consider retry mechanisms for transient errors
  • Log errors appropriately

4. Resource Cleanup

// Proper cleanup pattern
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(subscriber);
// ... publish items
} // Auto-close called

Real-World Use Cases

  1. Data Streaming - Process real-time data feeds
  2. Microservices Communication - Async service-to-service communication
  3. File Processing - Stream large files with backpressure
  4. WebSocket Messages - Handle real-time web messages
  5. Database Query Results - Stream large result sets

Conclusion

The Flow API provides a standardized way to implement reactive streams in Java:

  • Publisher emits data streams
  • Subscriber consumes data with backpressure control
  • Subscription manages the data flow between them
  • Processor enables stream transformation

Key Benefits:

  • Asynchronous Processing - Non-blocking data flow
  • Backpressure Management - Prevents resource exhaustion
  • Composability - Easy to chain and transform streams
  • Standardization - Interoperable with other reactive libraries

The Flow API is particularly valuable for building responsive, resilient, and elastic applications that need to handle streams of data efficiently.


Note: While SubmissionPublisher provides a convenient implementation, for production systems consider using established reactive libraries like Project Reactor or RxJava that build upon these concepts with additional features and optimizations.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper