Reactive Programming with Reactor in Java: Mastering Asynchronous Data Streams

Reactive Programming is a paradigm for building non-blocking, asynchronous, and event-driven applications that can handle backpressure efficiently. Project Reactor is a fully non-blocking foundation with backpressure support that powers Spring WebFlux.


1. Understanding Reactive Programming

Core Principles:

  • Asynchronous & Non-blocking: No threads blocked waiting for responses
  • Event-driven: React to events as they occur
  • Backpressure: Consumers control the rate of data flow
  • Functional style: Declarative composition of data streams

Reactive Streams Specification:

  • Publisher: Source of data
  • Subscriber: Consumer of data
  • Subscription: Control link between Publisher and Subscriber
  • Processor: Both Publisher and Subscriber

2. Project Reactor Setup

Maven Dependencies

<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version>
</dependency>
<!-- For testing -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.5.0</version>
<scope>test</scope>
</dependency>
</dependencies>

Gradle Configuration

dependencies {
implementation 'io.projectreactor:reactor-core:3.5.0'
testImplementation 'io.projectreactor:reactor-test:3.5.0'
}

3. Core Types: Mono and Flux

Mono: 0-1 Element

import reactor.core.publisher.Mono;
public class MonoExamples {
// Creating Mono instances
public void createMonos() {
// Empty Mono
Mono<String> emptyMono = Mono.empty();
// Mono with a value
Mono<String> valueMono = Mono.just("Hello, Reactor!");
// Mono from Callable
Mono<String> callableMono = Mono.fromCallable(() -> {
// Simulate expensive computation
Thread.sleep(100);
return "Computed Value";
});
// Mono from Supplier
Mono<String> supplierMono = Mono.fromSupplier(() -> "Supplied Value");
// Mono that errors
Mono<String> errorMono = Mono.error(new RuntimeException("Something went wrong"));
}
// Transforming Mono
public void transformMonos() {
Mono<String> original = Mono.just("hello");
// Map transformation
Mono<String> upperCase = original.map(String::toUpperCase);
// FlatMap for asynchronous operations
Mono<String> flatMapped = original.flatMap(s -> 
Mono.just(s + " transformed asynchronously"));
// Filter
Mono<String> filtered = original.filter(s -> s.length() > 3);
}
}

Flux: 0-N Elements

import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Arrays;
public class FluxExamples {
// Creating Flux instances
public void createFlux() {
// From values
Flux<String> fromValues = Flux.just("A", "B", "C");
// From collections
Flux<String> fromList = Flux.fromIterable(Arrays.asList("X", "Y", "Z"));
// From arrays
Flux<Integer> fromArray = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});
// Range
Flux<Integer> range = Flux.range(1, 10); // 1 to 10
// Interval (async stream)
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// Generate (stateful)
Flux<Integer> generated = Flux.generate(
() -> 0, // initial state
(state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
}
);
}
// Transforming Flux
public void transformFlux() {
Flux<Integer> numbers = Flux.range(1, 5);
// Map
Flux<String> strings = numbers.map(n -> "Number: " + n);
// Filter
Flux<Integer> evens = numbers.filter(n -> n % 2 == 0);
// FlatMap
Flux<String> expanded = numbers.flatMap(n -> 
Flux.just(n + "a", n + "b", n + "c"));
// Buffer
Flux<List<Integer>> buffered = numbers.buffer(2); // [1,2], [3,4], [5]
// Window
Flux<Flux<Integer>> windowed = numbers.window(2); // Flux of Fluxes
}
}

4. Combining Reactive Streams

public class CombiningStreams {
public void combineExamples() {
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("X", "Y", "Z");
Mono<String> mono1 = Mono.just("Hello");
// Merge (interleave elements)
Flux<String> merged = Flux.merge(flux1, flux2);
// Output: A, X, B, Y, C, Z (order not guaranteed)
// Concat (preserve order)
Flux<String> concatenated = Flux.concat(flux1, flux2);
// Output: A, B, C, X, Y, Z
// Zip (pair elements)
Flux<String> zipped = Flux.zip(flux1, flux2, 
(f1, f2) -> f1 + f2);
// Output: AX, BY, CZ
// Combine latest
Flux<Long> interval1 = Flux.interval(Duration.ofMillis(500));
Flux<Long> interval2 = Flux.interval(Duration.ofMillis(700));
Flux<String> combined = Flux.combineLatest(interval1, interval2,
(i1, i2) -> i1 + "-" + i2);
}
public void errorHandling() {
Flux<String> fluxWithErrors = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error!")))
.concatWith(Mono.just("C")); // This won't be reached
// Error handling strategies
Flux<String> recovered = fluxWithErrors
.onErrorReturn("Fallback"); // A, B, Fallback
Flux<String> resumed = fluxWithErrors
.onErrorResume(e -> Flux.just("X", "Y", "Z")); // A, B, X, Y, Z
Flux<String> retried = fluxWithErrors
.retry(3); // Retry up to 3 times
}
}

5. Backpressure Handling

public class BackpressureExamples {
public void handleBackpressure() {
Flux<Integer> fastProducer = Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10));
// Different backpressure strategies
fastProducer
.onBackpressureBuffer(50) // Buffer up to 50 elements
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed"),
subscription -> subscription.request(10) // Request 10 initially
);
fastProducer
.onBackpressureDrop(dropped -> 
System.out.println("Dropped: " + dropped))
.subscribe();
fastProducer
.onBackpressureLatest() // Keep only the latest element
.subscribe();
}
public void controlledSubscription() {
Flux.range(1, 100)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Request first 5 elements
request(5);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
if (value % 5 == 0) {
// Request 5 more every 5 elements
request(5);
}
}
});
}
}

6. Schedulers and Threading Model

public class SchedulerExamples {
public void schedulingOperations() {
Flux<Integer> flux = Flux.range(1, 5);
// Execute on different schedulers
flux.publishOn(Schedulers.parallel())
.map(n -> {
System.out.println("Map on: " + Thread.currentThread().getName());
return n * 2;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
// Common Schedulers:
// - Schedulers.immediate(): Current thread
// - Schedulers.single(): Single reusable thread
// - Schedulers.elastic(): Elastic thread pool (deprecated)
// - Schedulers.boundedElastic(): Bounded elastic pool
// - Schedulers.parallel(): Fixed pool for CPU-intensive work
// - Schedulers.fromExecutor(executor): Custom executor
}
public void parallelProcessing() {
Flux.range(1, 10)
.parallel(4) // Process in 4 parallel rails
.runOn(Schedulers.parallel())
.map(n -> n * n)
.sequential() // Convert back to regular Flux
.subscribe(System.out::println);
}
}

7. Real-World Examples

Example 1: Reactive REST Client

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactiveRestClient {
private final WebClient webClient;
public ReactiveRestClient() {
this.webClient = WebClient.builder()
.baseUrl("https://jsonplaceholder.typicode.com")
.build();
}
public Mono<User> getUserById(int id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> {
System.err.println("Error fetching user " + id + ": " + e.getMessage());
return Mono.just(new User()); // Fallback
});
}
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class)
.delayElements(Duration.ofMillis(100)) // Rate limiting
.filter(user -> user.getId() > 5);
}
public Flux<User> getUsersWithRetry(List<Integer> userIds) {
return Flux.fromIterable(userIds)
.flatMap(this::getUserById)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
}
class User {
private int id;
private String name;
private String email;
// Constructors, getters, setters
public User() {}
public User(int id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getters and setters...
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
}

Example 2: Reactive Data Processing Pipeline

public class DataProcessingPipeline {
public Flux<String> processDataStream(Flux<String> inputStream) {
return inputStream
.window(Duration.ofSeconds(1)) // Group by time windows
.flatMap(window -> window
.filter(line -> !line.trim().isEmpty())
.map(String::toUpperCase)
.collectList() // Collect window into List
.flatMapMany(list -> processBatch(list))
)
.onErrorContinue((error, element) -> 
System.err.println("Error processing: " + element + ", Error: " + error))
.doOnNext(processed -> 
System.out.println("Processed: " + processed))
.doOnComplete(() -> 
System.out.println("Processing completed"));
}
private Flux<String> processBatch(List<String> batch) {
return Flux.fromIterable(batch)
.publishOn(Schedulers.parallel())
.flatMap(this::asyncProcessing, 10); // Concurrency of 10
}
private Mono<String> asyncProcessing(String data) {
return Mono.fromCallable(() -> {
// Simulate async processing
Thread.sleep(100);
return "PROCESSED: " + data;
})
.subscribeOn(Schedulers.boundedElastic());
}
// Example usage
public static void main(String[] args) throws InterruptedException {
DataProcessingPipeline pipeline = new DataProcessingPipeline();
// Simulate incoming data stream
Flux<String> dataStream = Flux.interval(Duration.ofMillis(200))
.map(i -> "data-" + i)
.take(50); // Limit for demo
pipeline.processDataStream(dataStream)
.subscribe();
// Keep main thread alive
Thread.sleep(10000);
}
}

8. Testing Reactive Streams

import reactor.test.StepVerifier;
import java.time.Duration;
public class ReactorTesting {
public void testMono() {
Mono<String> mono = Mono.just("hello")
.map(String::toUpperCase)
.delayElement(Duration.ofMillis(100));
StepVerifier.create(mono)
.expectNext("HELLO")
.verifyComplete();
}
public void testFlux() {
Flux<Integer> flux = Flux.range(1, 5)
.filter(n -> n % 2 == 0)
.map(n -> n * 2);
StepVerifier.create(flux)
.expectNext(4)
.expectNext(8)
.verifyComplete();
}
public void testWithVirtualTime() {
StepVerifier.withVirtualTime(() -> 
Flux.interval(Duration.ofDays(1))
.take(2)
)
.expectSubscription()
.thenAwait(Duration.ofDays(2)) // Fast-forward 2 days
.expectNext(0L, 1L)
.verifyComplete();
}
public void testErrorScenarios() {
Flux<String> fluxWithError = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error!")));
StepVerifier.create(fluxWithError)
.expectNext("A", "B")
.verifyError(RuntimeException.class);
}
}

9. Best Practices and Patterns

Common Patterns:

public class ReactorPatterns {
// Cold vs Hot publishers
public void coldVsHot() {
// Cold: New subscription starts data flow from beginning
Flux<Integer> cold = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("New subscription"));
// Hot: Shares data among subscribers
ConnectableFlux<Integer> hot = Flux.range(1, 3)
.delayElements(Duration.ofSeconds(1))
.publish(); // Convert to hot
hot.connect(); // Start emitting regardless of subscribers
}
// Caching results
public void caching() {
Mono<String> expensiveOperation = Mono.fromCallable(() -> {
System.out.println("Computing...");
Thread.sleep(1000);
return "Expensive Result";
})
.cache(Duration.ofMinutes(5)); // Cache for 5 minutes
}
// Resource management
public void resourceManagement() {
Mono<String> resourceMono = Mono.using(
() -> new ExpensiveResource(), // Resource supplier
resource -> Mono.just(resource.getData()), // Resource usage
resource -> resource.cleanup() // Resource cleanup
);
}
// Timeout handling
public void timeoutHandling() {
Mono<String> slowMono = Mono.just("data")
.delayElement(Duration.ofSeconds(5));
slowMono
.timeout(Duration.ofSeconds(2))
.onErrorReturn("Fallback due to timeout")
.subscribe(System.out::println);
}
}
class ExpensiveResource implements AutoCloseable {
public String getData() { return "Data"; }
public void cleanup() { System.out.println("Cleaning up resource"); }
@Override public void close() { cleanup(); }
}

10. Performance Considerations

public class PerformanceTips {
// Avoid blocking operations in reactive chains
public void avoidBlocking() {
// ❌ Bad - blocks the thread
// Mono.fromCallable(() -> blockingHttpCall())
// ✅ Good - use dedicated scheduler for blocking operations
Mono.fromCallable(() -> blockingHttpCall())
.subscribeOn(Schedulers.boundedElastic());
}
// Use appropriate concurrency
public void controlConcurrency() {
Flux.range(1, 1000)
.flatMap(i -> asyncCall(i), 10) // Limit to 10 concurrent calls
.subscribe();
}
// Buffer appropriately
public void smartBuffering() {
Flux.range(1, 1000)
.buffer(100) // Process in batches of 100
.flatMap(batch -> processBatch(batch), 5) // 5 concurrent batches
.subscribe();
}
private Mono<String> asyncCall(int i) {
return Mono.just("Processed: " + i)
.delayElement(Duration.ofMillis(10));
}
private Mono<List<String>> processBatch(List<Integer> batch) {
return Mono.just(batch.stream()
.map(n -> "Batch: " + n)
.collect(Collectors.toList()));
}
}

Conclusion

Reactor provides a powerful foundation for building reactive applications in Java:

Key Benefits:

  • Non-blocking I/O: Better resource utilization
  • Backpressure: Prevents overwhelming consumers
  • Composable API: Functional, declarative style
  • Rich Operators: Extensive transformation capabilities

When to Use Reactor:

  • High-concurrency applications
  • Microservices with non-blocking communication
  • Real-time data processing
  • Systems requiring backpressure management

Start with simple Mono and Flux operations, master backpressure handling, and leverage Reactor's rich operator ecosystem to build efficient, scalable reactive systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper