Project Reactor Mono and Flux in Java

Introduction

Project Reactor is a fully non-blocking reactive programming foundation for the JVM, implementing the Reactive Streams specification. It provides two core types: Mono for 0-1 elements and Flux for 0-N elements, enabling efficient asynchronous data processing with backpressure support.

Core Concepts

Mono: Asynchronous 0-1 Result

public class MonoExamples {
// Creating Mono instances
public void demonstrateMonoCreation() {
// Empty Mono
Mono<String> emptyMono = Mono.empty();
// From value
Mono<String> valueMono = Mono.just("Hello Reactor");
// From Optional
Mono<String> optionalMono = Mono.justOrEmpty(Optional.of("Optional Value"));
// From Supplier (lazy evaluation)
Mono<String> supplierMono = Mono.fromSupplier(() -> {
System.out.println("Supplier executed!");
return "From Supplier";
});
// From Callable
Mono<String> callableMono = Mono.fromCallable(() -> "From Callable");
// From Runnable (completes without value)
Mono<Void> runnableMono = Mono.fromRunnable(() -> 
System.out.println("Task completed"));
// Error Mono
Mono<String> errorMono = Mono.error(new RuntimeException("Something went wrong"));
}
// Transforming Mono values
public void demonstrateMonoTransformations() {
Mono<String> original = Mono.just("reactive programming");
// Map transformation
Mono<String> upperCase = original.map(String::toUpperCase);
// FlatMap for asynchronous transformations
Mono<String> flatMapped = original.flatMap(value -> 
Mono.just(value + " with Reactor"));
// Filter values
Mono<String> filtered = original.filter(value -> value.length() > 5);
// Default value if empty
Mono<String> withDefault = original.defaultIfEmpty("Default Value");
// Switch to another Mono if empty
Mono<String> switchIfEmpty = original
.switchIfEmpty(Mono.just("Fallback Value"));
}
}

Flux: Asynchronous Sequence

public class FluxExamples {
// Creating Flux instances
public void demonstrateFluxCreation() {
// From values
Flux<String> fromValues = Flux.just("A", "B", "C", "D");
// From array
Flux<String> fromArray = Flux.fromArray(new String[]{"X", "Y", "Z"});
// From iterable
Flux<String> fromIterable = Flux.fromIterable(
Arrays.asList("Java", "Reactor", "Reactive"));
// Range of numbers
Flux<Integer> numberRange = Flux.range(1, 10);
// From stream
Flux<String> fromStream = Flux.fromStream(
Stream.of("Stream", "Elements"));
// Generate sequence
Flux<Integer> generated = Flux.generate(
() -> 0,  // initial state
(state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
});
// Create with complex logic
Flux<String> created = Flux.create(sink -> {
// Can push multiple elements asynchronously
sink.next("First");
sink.next("Second");
sink.next("Third");
sink.complete();
});
}
// Transforming Flux sequences
public void demonstrateFluxTransformations() {
Flux<Integer> numbers = Flux.range(1, 10);
// Map transformation
Flux<Integer> squared = numbers.map(n -> n * n);
// Filter elements
Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
// FlatMap for asynchronous transformations
Flux<String> stringNumbers = numbers.flatMap(n -> 
Mono.just("Number: " + n));
// Take first N elements
Flux<Integer> firstThree = numbers.take(3);
// Skip first N elements
Flux<Integer> skipFirstThree = numbers.skip(3);
// Distinct elements
Flux<Integer> distinct = Flux.just(1, 2, 2, 3, 3, 3).distinct();
}
}

Combining and Coordinating Reactive Streams

Combining Multiple Publishers

public class CombiningExamples {
public void demonstrateCombiningOperations() {
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("X", "Y", "Z");
// Merge flux sequences (interleaved)
Flux<String> merged = Flux.merge(flux1, flux2);
// Concatenate flux sequences (sequential)
Flux<String> concatenated = Flux.concat(flux1, flux2);
// Zip - combine pairwise
Flux<String> zipped = Flux.zip(flux1, flux2, 
(f1, f2) -> f1 + "-" + f2);
// Combine latest
Flux<String> combined = Flux.combineLatest(
flux1, 
flux2, 
(f1, f2) -> f1 + f2);
// Start with another publisher
Flux<String> withStart = flux1.startWith(mono1);
// Then switch to another publisher
Flux<String> thenSwitch = mono1.thenMany(flux1);
}
// Complex combination scenarios
public void complexCombinations() {
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<String> messages = Flux.just("Msg1", "Msg2", "Msg3");
// Combine interval with messages
Flux<String> timedMessages = Flux.zip(interval, messages)
.map(tuple -> tuple.getT2() + " at " + tuple.getT1() + "s");
// Merge with different rates
Flux<String> fast = Flux.interval(Duration.ofMillis(500))
.map(i -> "Fast-" + i);
Flux<String> slow = Flux.interval(Duration.ofSeconds(2))
.map(i -> "Slow-" + i);
Flux<String> mixedRates = Flux.merge(fast, slow);
}
}

Error Handling and Recovery

Comprehensive Error Handling

public class ErrorHandlingExamples {
public void demonstrateErrorHandling() {
Flux<String> fluxWithErrors = Flux.just("A", "B", "C")
.concatWith(Mono.error(new RuntimeException("Forced Error")))
.concatWith(Mono.just("D"));  // This won't be reached
// On Error Return - provide fallback value
Flux<String> withFallback = fluxWithErrors
.onErrorReturn("Fallback Value");
// On Error Resume - switch to fallback publisher
Flux<String> withResume = fluxWithErrors
.onErrorResume(error -> Flux.just("X", "Y", "Z"));
// On Error Map - transform the error
Flux<String> withErrorMap = fluxWithErrors
.onErrorMap(original -> new CustomException(original));
// On Error Continue - skip error and continue
Flux<String> withContinue = fluxWithErrors
.onErrorContinue((error, element) -> 
System.err.println("Error processing: " + element));
// Retry mechanism
Flux<String> withRetry = fluxWithErrors
.retry(3)  // Retry up to 3 times
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)));
}
// Timeout handling
public void timeoutHandling() {
Flux<String> slowFlux = Flux.just("A", "B", "C")
.delayElements(Duration.ofSeconds(2));
Flux<String> withTimeout = slowFlux
.timeout(Duration.ofSeconds(1))
.onErrorResume(TimeoutException.class, 
error -> Flux.just("Timeout", "Fallback"));
}
// Custom exception class
static class CustomException extends RuntimeException {
public CustomException(Throwable cause) {
super(cause);
}
}
}

Backpressure Handling

Managing Data Flow

public class BackpressureExamples {
public void demonstrateBackpressure() {
Flux<Integer> fastProducer = Flux.range(1, 1000)
.delayElements(Duration.ofMillis(10));
// Different backpressure strategies
Flux<Integer> buffered = fastProducer
.onBackpressureBuffer(100);  // Buffer up to 100 items
Flux<Integer> dropped = fastProducer
.onBackpressureDrop(item -> 
System.out.println("Dropped: " + item));
Flux<Integer> latest = fastProducer
.onBackpressureLatest();
Flux<Integer> errorOnOverflow = fastProducer
.onBackpressureError();
}
// Controlled consumption rate
public void controlledConsumption() {
Flux<Long> interval = Flux.interval(Duration.ofMillis(100));
// Limit consumption rate
Flux<Long> rateLimited = interval
.limitRate(10)  // Request 10 elements at a time
.doOnRequest(n -> System.out.println("Requested: " + n));
// Sample to reduce frequency
Flux<Long> sampled = interval.sample(Duration.ofSeconds(1));
// Window into batches
Flux<Flux<Long>> windowed = interval.window(5);  // Groups of 5
}
}

Real-World Use Cases

1. REST API with WebFlux

@RestController
public class ReactiveUserController {
private final ReactiveUserRepository userRepository;
public ReactiveUserController(ReactiveUserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userRepository.findAll()
.delayElements(Duration.ofMillis(100))  // Simulate processing
.doOnNext(user -> System.out.println("Processing: " + user.getName()));
}
@GetMapping("/users/{id}")
public Mono<User> getUserById(@PathVariable String id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {
return userRepository.save(user)
.doOnSuccess(saved -> 
System.out.println("User created: " + saved.getId()));
}
@GetMapping("/users/{id}/with-profile")
public Mono<UserProfile> getUserWithProfile(@PathVariable String id) {
Mono<User> userMono = userRepository.findById(id);
Mono<Profile> profileMono = profileRepository.findByUserId(id);
return Mono.zip(userMono, profileMono)
.map(tuple -> {
User user = tuple.getT1();
Profile profile = tuple.getT2();
return new UserProfile(user, profile);
});
}
}
// Custom exception
class UserNotFoundException extends RuntimeException {
public UserNotFoundException(String id) {
super("User not found: " + id);
}
}

2. Database Operations

@Repository
public class ReactiveUserRepository {
private final R2dbcEntityTemplate template;
public ReactiveUserRepository(R2dbcEntityTemplate template) {
this.template = template;
}
public Flux<User> findAllActiveUsers() {
return template.select(User.class)
.from("users")
.matching(Query.query(
Criteria.where("active").is(true)
))
.all()
.onErrorResume(error -> {
System.err.println("Database error: " + error.getMessage());
return Flux.empty();
});
}
public Mono<User> findById(String id) {
return template.select(User.class)
.from("users")
.matching(Query.query(
Criteria.where("id").is(id)
))
.one();
}
public Mono<User> save(User user) {
return template.insert(User.class)
.into("users")
.using(user);
}
// Batch operations
public Flux<User> saveAll(Flux<User> users) {
return template.insert(User.class)
.into("users")
.all(users)
.buffer(100)  // Process in batches of 100
.flatMap(batch -> template.insertAll(batch));
}
}

3. Event Processing Pipeline

public class EventProcessingPipeline {
public Flux<ProcessedEvent> processEventStream(Flux<RawEvent> eventStream) {
return eventStream
.window(Duration.ofSeconds(1))  // Group events by time window
.flatMap(window -> 
window
.filter(this::isValidEvent)
.map(this::enrichEvent)
.flatMap(this::processEventAsync)
.onErrorContinue((error, event) -> 
logError(error, event))
)
.buffer(10)  // Batch processing
.flatMap(this::persistProcessedEvents);
}
private boolean isValidEvent(RawEvent event) {
return event != null && 
event.getTimestamp() != null && 
!event.getType().isEmpty();
}
private EnrichedEvent enrichEvent(RawEvent event) {
return new EnrichedEvent(
event.getId(),
event.getType(),
event.getTimestamp(),
System.currentTimeMillis(),
addMetadata(event)
);
}
private Mono<ProcessedEvent> processEventAsync(EnrichedEvent event) {
return Mono.fromCallable(() -> heavyProcessing(event))
.subscribeOn(Schedulers.boundedElastic())  // Offload blocking work
.timeout(Duration.ofSeconds(30))
.onErrorResume(error -> 
Mono.just(createErrorResult(event, error)));
}
private Flux<ProcessedEvent> persistProcessedEvents(
List<ProcessedEvent> events) {
return Flux.fromIterable(events)
.flatMap(this::saveToDatabase)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private void logError(Throwable error, Object event) {
System.err.println("Error processing event: " + event);
error.printStackTrace();
}
}

Testing Reactive Streams

Comprehensive Testing

@ExtendWith(SpringExtension.class)
public class ReactiveStreamTests {
@Test
public void testMonoOperations() {
StepVerifier.create(Mono.just("Hello"))
.expectNext("Hello")
.verifyComplete();
StepVerifier.create(Mono.error(new RuntimeException("Error")))
.expectError(RuntimeException.class)
.verify();
}
@Test
public void testFluxOperations() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5))
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
StepVerifier.create(Flux.range(1, 5).filter(n -> n % 2 == 0))
.expectNext(2, 4)
.verifyComplete();
}
@Test
public void testWithVirtualTime() {
StepVerifier.withVirtualTime(() -> 
Flux.interval(Duration.ofDays(1))
.take(2)
)
.thenAwait(Duration.ofDays(2))
.expectNext(0L, 1L)
.verifyComplete();
}
@Test
public void testErrorScenarios() {
StepVerifier.create(
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException("Error")))
)
.expectNext(1, 2, 3)
.expectError(RuntimeException.class)
.verify();
}
@Test
public void testContextPropagation() {
Mono<String> mono = Mono.deferContextual(ctx -> 
Mono.just("Hello " + ctx.get("user"))
).contextWrite(Context.of("user", "Alice"));
StepVerifier.create(mono)
.expectNext("Hello Alice")
.verifyComplete();
}
}

Schedulers and Thread Management

Controlling Execution Context

public class SchedulerExamples {
public void demonstrateSchedulers() {
Flux<Integer> flux = Flux.range(1, 10);
// Different scheduler types
Flux<Integer> immediate = flux.subscribeOn(Schedulers.immediate());
Flux<Integer> single = flux.subscribeOn(Schedulers.single());
Flux<Integer> parallel = flux.subscribeOn(Schedulers.parallel());
Flux<Integer> boundedElastic = flux.subscribeOn(Schedulers.boundedElastic());
Flux<Integer> fromExecutor = flux.subscribeOn(
Schedulers.fromExecutor(Executors.newFixedThreadPool(5)));
// Publish on different scheduler
Flux<String> processed = Flux.range(1, 100)
.publishOn(Schedulers.parallel())
.map(n -> "Processing " + n + " on " + Thread.currentThread().getName())
.publishOn(Schedulers.single())
.map(str -> str + " -> Final on " + Thread.currentThread().getName());
}
// Custom scheduler configuration
@Configuration
public class SchedulerConfig {
@Bean
public Scheduler customScheduler() {
return Schedulers.newBoundedElastic(
10,                      // max threads
100,                     // max task queue size
"custom-scheduler"
);
}
}
// Blocking operation handling
public Mono<String> handleBlockingOperation() {
return Mono.fromCallable(() -> {
// Simulate blocking I/O
Thread.sleep(1000);
return "Blocking Result";
})
.subscribeOn(Schedulers.boundedElastic())  // Offload blocking work
.timeout(Duration.ofSeconds(2));
}
}

Best Practices and Patterns

Production-Ready Patterns

public class ReactorBestPractices {
// 1. Proper error handling
public Flux<String> robustDataProcessing(Flux<String> input) {
return input
.doOnNext(item -> System.out.println("Processing: " + item))
.flatMap(this::processItemSafely)
.onErrorResume(error -> {
logError(error);
return Flux.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.doOnComplete(() -> System.out.println("Processing completed"));
}
private Mono<String> processItemSafely(String item) {
return Mono.fromCallable(() -> expensiveProcessing(item))
.onErrorResume(error -> {
System.err.println("Failed to process: " + item);
return Mono.empty();
});
}
// 2. Resource cleanup
public Flux<String> withResourceCleanup() {
return Flux.using(
() -> new FileInputStream("data.txt"),  // resource supplier
inputStream -> Flux.fromStream(
new BufferedReader(new InputStreamReader(inputStream))
.lines()),
inputStream -> {
try {
inputStream.close();
} catch (IOException e) {
System.err.println("Error closing stream: " + e.getMessage());
}
}
);
}
// 3. Monitoring and metrics
public Flux<String> withMonitoring(Flux<String> input) {
AtomicLong counter = new AtomicLong();
return input
.doOnNext(item -> counter.incrementAndGet())
.doOnComplete(() -> 
System.out.println("Processed " + counter.get() + " items"))
.doOnError(error -> 
System.err.println("Failed after " + counter.get() + " items"));
}
// 4. Backpressure-aware processing
public Flux<String> backpressureAware(Flux<String> fastProducer) {
return fastProducer
.onBackpressureBuffer(1000, 
dropped -> System.out.println("Buffer overflow, dropping: " + dropped))
.limitRate(100)  // Control request rate
.doOnRequest(n -> 
System.out.println("Downstream requested: " + n + " elements"));
}
private void logError(Throwable error) {
System.err.println("Error occurred: " + error.getMessage());
}
private String expensiveProcessing(String item) {
// Simulate expensive operation
return item.toUpperCase();
}
}

Conclusion

Project Reactor's Mono and Flux provide a powerful foundation for building reactive applications in Java. Key takeaways:

  • Mono represents single-valued or empty asynchronous computations
  • Flux handles streams of 0 to N elements with backpressure support
  • Operators enable functional transformation of data streams
  • Schedulers control threading and execution context
  • Error handling and backpressure management are built-in
  • Testing is supported through StepVerifier for comprehensive verification

By mastering these concepts, developers can build highly scalable, resilient, and responsive applications that efficiently handle asynchronous data streams while maintaining clean, declarative code.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper