Backpressure Handling in Reactive Streams in Java

Introduction to Backpressure

Backpressure is a crucial concept in reactive programming that deals with the problem of controlling the data flow between a fast producer and a slow consumer. It prevents overwhelming the consumer with more data than it can handle, thus avoiding memory issues and system instability.

Reactive Streams Specification

Core Interfaces

The Reactive Streams specification defines four main interfaces:

// Publisher: Data producer
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
// Subscriber: Data consumer  
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
// Subscription: Control flow between Publisher and Subscriber
public interface Subscription {
void request(long n);
void cancel();
}
// Processor: Both Publisher and Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Backpressure Strategies

1. Pull-Based Backpressure

import org.reactivestreams.*;
import java.util.concurrent.atomic.*;
public class PullBackpressureExample {
static class SimplePublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new SimpleSubscription(subscriber));
}
}
static class SimpleSubscription implements Subscription {
private final Subscriber<? super Integer> subscriber;
private final AtomicLong requested = new AtomicLong(0);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private int currentValue = 0;
public SimpleSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException(
"Request must be positive"));
return;
}
// Add the requested amount
long previousRequested = requested.getAndAdd(n);
if (previousRequested == 0) {
// Start producing if we weren't already
produceItems();
}
}
@Override
public void cancel() {
cancelled.set(true);
}
private void produceItems() {
// Use a separate thread to avoid blocking
new Thread(() -> {
while (!cancelled.get() && requested.get() > 0) {
// Produce one item
subscriber.onNext(currentValue++);
// Decrement the requested count
requested.decrementAndGet();
// Simulate some work
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// If we've produced all requested items and not cancelled, complete
if (!cancelled.get() && requested.get() == 0) {
subscriber.onComplete();
}
}).start();
}
}
static class SlowSubscriber implements Subscriber<Integer> {
private Subscription subscription;
private final String name;
public SlowSubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println(name + " subscribed");
// Request first 3 items
subscription.request(3);
}
@Override
public void onNext(Integer item) {
System.out.println(name + " received: " + item);
// Simulate slow processing
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Request one more item after processing
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(name + " error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + " completed");
}
}
public static void main(String[] args) throws InterruptedException {
SimplePublisher publisher = new SimplePublisher();
SlowSubscriber subscriber = new SlowSubscriber("SlowConsumer");
publisher.subscribe(subscriber);
// Keep main thread alive
Thread.sleep(5000);
}
}

2. Buffer-Based Backpressure

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class BufferBackpressureExample {
static class BufferedPublisher implements Publisher<String> {
private final ExecutorService executor = Executors.newFixedThreadPool(2);
private final int bufferSize;
public BufferedPublisher(int bufferSize) {
this.bufferSize = bufferSize;
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BufferedSubscription(subscriber, bufferSize, executor));
}
}
static class BufferedSubscription implements Subscription {
private final Subscriber<? super String> subscriber;
private final BlockingQueue<String> buffer;
private final AtomicLong requested = new AtomicLong(0);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final ExecutorService executor;
private volatile boolean producing = false;
public BufferedSubscription(Subscriber<? super String> subscriber, 
int bufferSize, ExecutorService executor) {
this.subscriber = subscriber;
this.buffer = new LinkedBlockingQueue<>(bufferSize);
this.executor = executor;
}
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException(
"Request must be positive"));
return;
}
requested.getAndAdd(n);
drainBuffer();
}
@Override
public void cancel() {
cancelled.set(true);
buffer.clear();
}
public void startProducing() {
if (producing) return;
producing = true;
executor.submit(() -> {
int count = 0;
while (!cancelled.get()) {
try {
String item = "Item-" + count++;
// Try to add to buffer with timeout
boolean added = buffer.offer(item, 100, TimeUnit.MILLISECONDS);
if (added) {
System.out.println("Produced: " + item + 
" (Buffer size: " + buffer.size() + ")");
drainBuffer();
} else {
// Buffer full - apply backpressure
System.out.println("Buffer full! Slowing down production...");
Thread.sleep(500); // Slow down producer
}
// Simulate production time
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void drainBuffer() {
while (!cancelled.get() && requested.get() > 0 && !buffer.isEmpty()) {
String item = buffer.poll();
if (item != null) {
subscriber.onNext(item);
requested.decrementAndGet();
}
}
}
}
static class BufferedSubscriber implements Subscriber<String> {
private Subscription subscription;
private final String name;
private final int processingDelay;
public BufferedSubscriber(String name, int processingDelay) {
this.name = name;
this.processingDelay = processingDelay;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println(name + " subscribed");
// Request initial batch
subscription.request(5);
// Start production
if (subscription instanceof BufferedSubscription) {
((BufferedSubscription) subscription).startProducing();
}
}
@Override
public void onNext(String item) {
System.out.println(name + " processing: " + item);
// Simulate variable processing time
try {
Thread.sleep(processingDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Request next item
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(name + " error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + " completed");
}
}
public static void main(String[] args) throws InterruptedException {
BufferedPublisher publisher = new BufferedPublisher(10); // Buffer size 10
// Fast producer, slow consumer
BufferedSubscriber slowSubscriber = new BufferedSubscriber("SlowConsumer", 500);
publisher.subscribe(slowSubscriber);
Thread.sleep(10000);
}
}

Project Reactor Backpressure Handling

Reactor Backpressure Operators

import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class ReactorBackpressureExamples {
public static void main(String[] args) throws InterruptedException {
// Example 1: onBackpressureBuffer - Buffer items when consumer is slow
System.out.println("=== onBackpressureBuffer ===");
Flux.range(1, 1000)
.onBackpressureBuffer(50) // Buffer up to 50 items
.doOnNext(i -> System.out.println("Produced: " + i))
.publishOn(Schedulers.single(), 10) // Consumer with prefetch of 10
.subscribe(i -> {
// Slow consumer
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Consumed: " + i);
});
Thread.sleep(2000);
System.out.println("\n=== onBackpressureDrop ===");
// Example 2: onBackpressureDrop - Drop items when buffer is full
AtomicInteger droppedCount = new AtomicInteger();
Flux.range(1, 100)
.onBackpressureDrop(item -> {
droppedCount.incrementAndGet();
System.out.println("Dropped: " + item);
})
.publishOn(Schedulers.single(), 5) // Small buffer
.subscribe(item -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + item);
});
Thread.sleep(5000);
System.out.println("Total dropped: " + droppedCount.get());
System.out.println("\n=== onBackpressureLatest ===");
// Example 3: onBackpressureLatest - Keep only the latest item
Flux.interval(Duration.ofMillis(10))
.onBackpressureLatest()
.publishOn(Schedulers.single(), 1)
.subscribe(item -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Latest: " + item);
});
Thread.sleep(2000);
System.out.println("\n=== onBackpressureError ===");
// Example 4: onBackpressureError - Throw error on overflow
Flux.range(1, 1000)
.onBackpressureError()
.publishOn(Schedulers.single(), 5)
.subscribe(
item -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Item: " + item);
},
error -> System.err.println("Error: " + error)
);
Thread.sleep(3000);
}
}

Custom Backpressure Strategies with Reactor

import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.atomic.*;
public class CustomBackpressureStrategies {
// Strategy 1: Adaptive Request Strategy
public static class AdaptiveBackpressureStrategy {
private final AtomicLong requestSize = new AtomicLong(1);
private final AtomicLong processedCount = new AtomicLong(0);
private final long windowSize;
private final long maxRequestSize;
public AdaptiveBackpressureStrategy(long windowSize, long maxRequestSize) {
this.windowSize = windowSize;
this.maxRequestSize = maxRequestSize;
}
public Flux<Integer> createAdaptiveStream() {
return Flux.create(sink -> {
AtomicInteger counter = new AtomicInteger(0);
AtomicLong currentRequested = new AtomicLong(0);
sink.onRequest(n -> {
currentRequested.addAndGet(n);
produceItems(sink, counter, currentRequested);
});
// Initial request
sink.request(requestSize.get());
});
}
private void produceItems(FluxSink<Integer> sink, AtomicInteger counter, 
AtomicLong currentRequested) {
while (currentRequested.get() > 0 && !sink.isCancelled()) {
int item = counter.incrementAndGet();
sink.next(item);
currentRequested.decrementAndGet();
processedCount.incrementAndGet();
// Adaptive logic: increase request size if processing is fast
if (processedCount.get() % windowSize == 0) {
long currentSize = requestSize.get();
if (currentSize < maxRequestSize) {
requestSize.set(Math.min(currentSize * 2, maxRequestSize));
System.out.println("Increased request size to: " + requestSize.get());
}
}
// Simulate production time
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sink.error(e);
return;
}
}
}
}
// Strategy 2: Time-Based Backpressure
public static class TimeBasedBackpressureStrategy {
public static Flux<String> createTimeBasedStream(Duration productionInterval, 
Duration consumptionRate) {
return Flux.interval(productionInterval)
.onBackpressureBuffer(100, 
drop -> System.out.println("Buffer full, dropping: " + drop))
.map(i -> "Item-" + i)
.publishOn(Schedulers.parallel())
.doOnNext(item -> {
// Simulate processing time based on consumption rate
try {
Thread.sleep(consumptionRate.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
// Strategy 3: Load-Based Backpressure
public static class LoadBasedBackpressureStrategy {
private final double maxCpuThreshold;
private final long maxMemoryThreshold;
public LoadBasedBackpressureStrategy(double maxCpuThreshold, long maxMemoryThreshold) {
this.maxCpuThreshold = maxCpuThreshold;
this.maxMemoryThreshold = maxMemoryThreshold;
}
public Flux<Integer> createLoadAwareStream() {
return Flux.generate(
() -> 0, // Initial state
(state, sink) -> {
// Check system load
if (isSystemOverloaded()) {
System.out.println("System overloaded, applying backpressure");
try {
Thread.sleep(1000); // Slow down production
} catch (InterruptedException e) {
sink.error(e);
return state;
}
}
sink.next(state);
return state + 1;
});
}
private boolean isSystemOverloaded() {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double memoryUsage = (double) usedMemory / maxMemory;
// Simple simulation - in real system, you'd use proper metrics
return memoryUsage > 0.8 || Math.random() < 0.1; // 10% chance of simulated overload
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Adaptive Backpressure ===");
AdaptiveBackpressureStrategy adaptiveStrategy = 
new AdaptiveBackpressureStrategy(10, 20);
adaptiveStrategy.createAdaptiveStream()
.take(50)
.subscribe(item -> {
// Variable processing time
try {
Thread.sleep(50 + (long)(Math.random() * 100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + item);
});
Thread.sleep(5000);
System.out.println("\n=== Time-Based Backpressure ===");
TimeBasedBackpressureStrategy.createTimeBasedStream(
Duration.ofMillis(50), Duration.ofMillis(200))
.take(20)
.subscribe(System.out::println);
Thread.sleep(5000);
System.out.println("\n=== Load-Based Backpressure ===");
LoadBasedBackpressureStrategy loadStrategy = 
new LoadBasedBackpressureStrategy(0.8, 1000000000);
loadStrategy.createLoadAwareStream()
.take(30)
.subscribe(item -> System.out.println("Load-aware item: " + item));
Thread.sleep(5000);
}
}

RxJava Backpressure Handling

RxJava Backpressure Operators

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class RxJavaBackpressureExamples {
public static void main(String[] args) throws InterruptedException {
// Example 1: Flowable with backpressure support
System.out.println("=== Flowable with Backpressure ===");
Flowable.range(1, 1000)
.onBackpressureBuffer(50, // buffer size
() -> System.out.println("Buffer overflow!"),
BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.io(), false, 10) // prefetch 10
.subscribe(
item -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + item);
},
error -> System.err.println("Error: " + error)
);
TimeUnit.SECONDS.sleep(3);
System.out.println("\n=== onBackpressureDrop ===");
// Example 2: Drop strategy
AtomicInteger dropCount = new AtomicInteger();
Flowable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureDrop(item -> {
dropCount.incrementAndGet();
System.out.println("Dropped: " + item);
})
.observeOn(Schedulers.computation(), false, 5)
.subscribe(
item -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Received: " + item);
}
);
TimeUnit.SECONDS.sleep(3);
System.out.println("Total dropped: " + dropCount.get());
System.out.println("\n=== onBackpressureLatest ===");
// Example 3: Latest strategy
Flowable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
item -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Latest: " + item);
}
);
TimeUnit.SECONDS.sleep(2);
}
}

Real-World Backpressure Scenarios

1. Database Query Results

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
public class DatabaseBackpressureExample {
static class DatabaseService {
public Flux<String> streamLargeQuery(String query, int fetchSize) {
return Flux.create(sink -> {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = DriverManager.getConnection("jdbc:h2:mem:test");
stmt = conn.createStatement();
stmt.setFetchSize(fetchSize);
rs = stmt.executeQuery(query);
AtomicLong requested = new AtomicLong(0);
sink.onRequest(n -> {
requested.addAndGet(n);
try {
while (requested.get() > 0 && rs.next() && !sink.isCancelled()) {
String row = rs.getString(1); // Simplified
sink.next(row);
requested.decrementAndGet();
}
if (!rs.next()) {
sink.complete();
}
} catch (SQLException e) {
sink.error(e);
}
});
} catch (SQLException e) {
sink.error(e);
} finally {
// Cleanup would go here
}
});
}
}
public static void main(String[] args) {
DatabaseService service = new DatabaseService();
service.streamLargeQuery("SELECT * FROM large_table", 100)
.onBackpressureBuffer(1000) // Buffer up to 1000 rows
.publishOn(Schedulers.boundedElastic())
.subscribe(
row -> {
// Process each row
try {
Thread.sleep(10); // Simulate processing time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + row);
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Query processing completed")
);
}
}

2. WebSocket Message Processing

import reactor.core.publisher.*;
import org.springframework.web.reactive.socket.*;
import java.util.concurrent.*;
public class WebSocketBackpressureExample {
static class WebSocketHandler {
private final Flux<String> messageFlux;
private final FluxSink<String> messageSink;
private final AtomicLong pendingMessages = new AtomicLong(0);
private final int maxBufferSize;
public WebSocketHandler(int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
Flux<String> flux = Flux.create(sink -> {
this.messageSink = sink;
sink.onRequest(n -> {
long pending = pendingMessages.addAndGet(-n);
System.out.println("Consumer requested " + n + " messages. Pending: " + pending);
});
});
this.messageFlux = flux.onBackpressureBuffer(maxBufferSize, 
item -> System.out.println("Dropping message due to backpressure: " + item));
}
public void handleMessage(String message) {
long pending = pendingMessages.get();
if (pending >= maxBufferSize) {
System.out.println("Backpressure: Rejecting message - " + message);
return;
}
pendingMessages.incrementAndGet();
messageSink.next(message);
}
public Flux<String> getMessageStream() {
return messageFlux;
}
}
public static void main(String[] args) throws InterruptedException {
WebSocketHandler handler = new WebSocketHandler(100);
// Start processing messages
handler.getMessageStream()
.publishOn(Schedulers.parallel(), 10) // Prefetch 10
.subscribe(message -> {
// Simulate message processing
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + message);
});
// Simulate incoming messages
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
String message = "Msg-" + System.currentTimeMillis();
handler.handleMessage(message);
}, 0, 50, TimeUnit.MILLISECONDS); // Fast producer
Thread.sleep(5000);
executor.shutdown();
}
}

Monitoring and Metrics for Backpressure

import reactor.core.publisher.*;
import java.util.concurrent.atomic.*;
import java.time.Duration;
public class BackpressureMonitoring {
static class MonitoredFlux<T> {
private final Flux<T> flux;
private final AtomicLong producedCount = new AtomicLong();
private final AtomicLong consumedCount = new AtomicLong();
private final AtomicLong droppedCount = new AtomicLong();
private final AtomicLong bufferSize = new AtomicLong();
public MonitoredFlux(Flux<T> source) {
this.flux = source
.doOnNext(item -> producedCount.incrementAndGet())
.onBackpressureBuffer(100, 
item -> droppedCount.incrementAndGet())
.doOnNext(item -> {
consumedCount.incrementAndGet();
bufferSize.decrementAndGet();
})
.doOnRequest(n -> bufferSize.addAndGet(n));
}
public Flux<T> getFlux() {
return flux;
}
public void printStats() {
System.out.printf(
"Produced: %d, Consumed: %d, Dropped: %d, Buffer: %d, Lag: %d%n",
producedCount.get(), consumedCount.get(), droppedCount.get(),
bufferSize.get(), producedCount.get() - consumedCount.get()
);
}
}
public static void main(String[] args) throws InterruptedException {
MonitoredFlux<Integer> monitoredFlux = new MonitoredFlux<>(
Flux.interval(Duration.ofMillis(10))
.map(Long::intValue)
.take(1000)
);
// Start statistics reporting
Flux.interval(Duration.ofSeconds(1))
.subscribe(tick -> monitoredFlux.printStats());
// Slow consumer
monitoredFlux.getFlux()
.publishOn(Schedulers.single(), 5)
.subscribe(item -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(10000);
}
}

Best Practices for Backpressure Handling

1. Choose the Right Strategy

public class BackpressureStrategyGuide {
public enum BackpressureScenario {
REAL_TIME_DATA,      // Use DROP or LATEST
BATCH_PROCESSING,    // Use BUFFER with large size
CRITICAL_DATA,       // Use BUFFER with ERROR strategy
RESOURCE_CONSTRAINED // Use adaptive backpressure
}
public static <T> Flux<T> applyStrategy(Flux<T> source, 
BackpressureScenario scenario) {
switch (scenario) {
case REAL_TIME_DATA:
return source.onBackpressureLatest();
case BATCH_PROCESSING:
return source.onBackpressureBuffer(10_000);
case CRITICAL_DATA:
return source.onBackpressureBuffer(1000, 
BufferOverflowStrategy.ERROR);
case RESOURCE_CONSTRAINED:
return source.onBackpressureBuffer(100, 
item -> System.out.println("Dropping under pressure: " + item));
default:
return source.onBackpressureBuffer();
}
}
}

2. Proper Resource Cleanup

import reactor.core.publisher.*;
import java.util.concurrent.*;
public class ResourceCleanupExample {
public static Flux<String> createResourceBoundStream() {
return Flux.using(
// Resource supplier
() -> {
System.out.println("Acquiring resource...");
return new ExpensiveResource();
},
// Stream generator
resource -> Flux.interval(Duration.ofMillis(50))
.map(i -> resource.getData(i))
.onBackpressureBuffer(100)
.doOnCancel(() -> System.out.println("Stream cancelled")),
// Resource cleanup
resource -> {
System.out.println("Cleaning up resource...");
resource.cleanup();
}
);
}
static class ExpensiveResource {
private final AtomicBoolean closed = new AtomicBoolean(false);
public String getData(long index) {
if (closed.get()) {
throw new IllegalStateException("Resource closed");
}
return "Data-" + index;
}
public void cleanup() {
closed.set(true);
System.out.println("Resource cleaned up");
}
}
public static void main(String[] args) throws InterruptedException {
createResourceBoundStream()
.take(10) // Automatically cleans up after 10 items
.subscribe(System.out::println);
Thread.sleep(1000);
}
}

Conclusion

Backpressure handling is essential for building robust reactive systems. Key takeaways:

  1. Understand Your Use Case: Choose the right backpressure strategy based on data criticality and system constraints
  2. Monitor and Measure: Implement monitoring to understand backpressure patterns
  3. Use Framework Support: Leverage built-in backpressure operators in Reactor/RxJava
  4. Consider Resource Limits: Always bound buffers and manage resources properly
  5. Test Under Load: Ensure your backpressure strategies work under production-like conditions

Proper backpressure handling ensures that your reactive applications remain responsive and stable even under heavy load conditions.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper