RxJava for Reactive Android in Java

Table of Contents

  1. Introduction to RxJava
  2. RxJava Basics and Setup
  3. Observables and Observers
  4. Operators and Transformation
  5. Schedulers and Threading
  6. Android-specific RxJava
  7. Networking with RxJava
  8. Database Operations
  9. Error Handling
  10. Testing RxJava
  11. Best Practices

Introduction to RxJava

RxJava is a Java implementation of Reactive Extensions for composing asynchronous and event-based programs using observable sequences. It greatly simplifies asynchronous programming in Android applications.

Key Benefits for Android:

  • Simplified async operations: Handle background tasks elegantly
  • Functional programming: Use operators to transform, combine, and manipulate data
  • Error handling: Comprehensive error handling mechanisms
  • Thread management: Easy switching between threads
  • Memory leak prevention: Built-in lifecycle awareness with RxAndroid

RxJava Basics and Setup

1. Gradle Dependencies

// build.gradle (Module: app)
dependencies {
// RxJava 3
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
// RxBinding for Android Views
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
// RxJava Adapter for Retrofit
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
// RxPermissions for runtime permissions
implementation 'com.github.tbruyelle:rxpermissions:0.12'
// Testing
testImplementation 'io.reactivex.rxjava3:rxjava:3.1.8'
testImplementation 'junit:junit:4.13.2'
androidTestImplementation 'androidx.test.ext:junit:1.1.5'
}

2. Basic RxJava Components

// RxJavaBasicExample.java
public class RxJavaBasicExample {
// 1. Observable - The data source that emits items
// 2. Observer - The consumer that receives items
// 3. Operators - Methods that transform, filter, or combine Observables
// 4. Schedulers - Control which thread operations occur on
public static void main(String[] args) {
basicObservableExample();
operatorsExample();
threadingExample();
}
private static void basicObservableExample() {
System.out.println("=== Basic Observable Example ===");
// Create an Observable that emits a sequence of integers
Observable<Integer> observable = Observable.create(emitter -> {
try {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
// Create an Observer to consume the emitted items
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Received: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
// Subscribe the observer to the observable
observable.subscribe(observer);
}
private static void operatorsExample() {
System.out.println("\n=== Operators Example ===");
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(number -> number % 2 == 0) // Filter even numbers
.map(number -> number * number)    // Square each number
.take(3)                          // Take only first 3
.subscribe(
result -> System.out.println("Processed: " + result),
error -> System.out.println("Error: " + error),
() -> System.out.println("Operation completed")
);
}
private static void threadingExample() {
System.out.println("\n=== Threading Example ===");
Observable.just("Hello", "World")
.subscribeOn(Schedulers.io())          // Execute on IO thread
.observeOn(Schedulers.computation())   // Observe on computation thread
.doOnNext(item -> System.out.println("Processing on: " + Thread.currentThread().getName()))
.observeOn(Schedulers.newThread())     // Switch to new thread
.subscribe(
item -> System.out.println("Received on: " + Thread.currentThread().getName() + " - " + item)
);
// Sleep to allow async operations to complete
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Observables and Observers

1. Different Types of Observables

// ObservableTypesExample.java
public class ObservableTypesExample {
public void demonstrateObservableTypes() {
createObservable();
justObservable();
fromObservable();
rangeObservable();
intervalObservable();
deferObservable();
}
private void createObservable() {
System.out.println("=== Observable.create() ===");
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("First item");
emitter.onNext("Second item");
emitter.onNext("Third item");
emitter.onComplete();
});
observable.subscribe(
item -> System.out.println("Received: " + item),
error -> System.out.println("Error: " + error),
() -> System.out.println("Completed")
);
}
private void justObservable() {
System.out.println("\n=== Observable.just() ===");
// Emits fixed set of items
Observable.just("Apple", "Banana", "Cherry")
.subscribe(item -> System.out.println("Fruit: " + item));
}
private void fromObservable() {
System.out.println("\n=== Observable.fromXXX() ===");
// From Iterable
List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
.subscribe(item -> System.out.println("From list: " + item));
// From Array
String[] array = {"A", "B", "C"};
Observable.fromArray(array)
.subscribe(item -> System.out.println("From array: " + item));
// From Callable
Observable.fromCallable(() -> "Result from callable")
.subscribe(item -> System.out.println("From callable: " + item));
}
private void rangeObservable() {
System.out.println("\n=== Observable.range() ===");
Observable.range(1, 5) // Start from 1, emit 5 numbers
.subscribe(number -> System.out.println("Number: " + number));
}
private void intervalObservable() {
System.out.println("\n=== Observable.interval() ===");
// Emits numbers at fixed time intervals
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.take(5) // Take only 5 emissions
.subscribe(
tick -> System.out.println("Tick: " + tick),
error -> System.out.println("Error: " + error),
() -> System.out.println("Interval completed")
);
// In real Android app, you'd dispose in onDestroy()
try {
Thread.sleep(6000); // Wait for completion
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
}
private void deferObservable() {
System.out.println("\n=== Observable.defer() ===");
// Defer creation until subscription
Observable<String> deferred = Observable.defer(() -> 
Observable.just("Current time: " + System.currentTimeMillis())
);
// Each subscription gets fresh data
deferred.subscribe(time -> System.out.println("First: " + time));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
deferred.subscribe(time -> System.out.println("Second: " + time));
}
// Special Observable types
public void specialObservables() {
System.out.println("\n=== Special Observables ===");
// Single - emits exactly one item or error
Single.just("Single result")
.subscribe(
result -> System.out.println("Single: " + result),
error -> System.out.println("Single error: " + error)
);
// Maybe - emits 0 or 1 item, optionally followed by completion
Maybe.just("Maybe result")
.subscribe(
result -> System.out.println("Maybe: " + result),
error -> System.out.println("Maybe error: " + error),
() -> System.out.println("Maybe completed")
);
// Completable - emits only completion or error signal
Completable.complete()
.subscribe(
() -> System.out.println("Completable completed"),
error -> System.out.println("Completable error: " + error)
);
}
}

2. Observer Patterns and Disposables

// ObserverPatternsExample.java
public class ObserverPatternsExample {
private CompositeDisposable compositeDisposable = new CompositeDisposable();
public void demonstrateObserverPatterns() {
basicObserver();
resourceManagement();
hotVsColdObservables();
backpressureExample();
}
private void basicObserver() {
System.out.println("=== Basic Observer Patterns ===");
// Using lambda expressions for cleaner code
Disposable disposable = Observable.range(1, 3)
.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
compositeDisposable.add(disposable);
}
private void resourceManagement() {
System.out.println("\n=== Resource Management ===");
// Using Disposable for resource cleanup
Disposable disposable1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Tick 1: " + tick));
Disposable disposable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(tick -> System.out.println("Tick 2: " + tick));
// Add to composite disposable for easy cleanup
compositeDisposable.addAll(disposable1, disposable2);
// Later, dispose all at once
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Disposing all subscriptions...");
compositeDisposable.clear();
}
private void hotVsColdObservables() {
System.out.println("\n=== Hot vs Cold Observables ===");
// Cold Observable - data produced for each subscriber
Observable<String> coldObservable = Observable.defer(() -> 
Observable.just("Data created at: " + System.currentTimeMillis())
);
System.out.println("Cold Observable:");
coldObservable.subscribe(data -> System.out.println("Subscriber 1: " + data));
coldObservable.subscribe(data -> System.out.println("Subscriber 2: " + data));
// Hot Observable - data produced independently of subscribers
PublishSubject<String> hotObservable = PublishSubject.create();
System.out.println("\nHot Observable:");
hotObservable.subscribe(data -> System.out.println("Subscriber A: " + data));
// Emit data
hotObservable.onNext("First emission");
hotObservable.onNext("Second emission");
// New subscriber joins late
hotObservable.subscribe(data -> System.out.println("Subscriber B: " + data));
hotObservable.onNext("Third emission");
}
private void backpressureExample() {
System.out.println("\n=== Backpressure Handling ===");
// Using Flowable for backpressure support
Flowable.range(1, 1000)
.onBackpressureBuffer() // Handle backpressure by buffering
.observeOn(Schedulers.computation())
.subscribe(
item -> {
try {
Thread.sleep(10); // Simulate slow consumer
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Processed: " + item);
},
error -> System.err.println("Error: " + error)
);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void cleanup() {
compositeDisposable.dispose();
}
}

Operators and Transformation

1. Common RxJava Operators

// RxJavaOperatorsExample.java
public class RxJavaOperatorsExample {
public void demonstrateOperators() {
transformationOperators();
filteringOperators();
combiningOperators();
utilityOperators();
errorHandlingOperators();
conditionalOperators();
}
private void transformationOperators() {
System.out.println("=== Transformation Operators ===");
// map - transform each item
Observable.just(1, 2, 3)
.map(number -> number * 2)
.subscribe(result -> System.out.println("Map: " + result));
// flatMap - transform each item into an Observable
Observable.just("Hello", "World")
.flatMap(word -> Observable.fromArray(word.split("")))
.subscribe(letter -> System.out.println("FlatMap: " + letter));
// concatMap - like flatMap but preserves order
Observable.just(1, 2, 3)
.concatMap(number -> Observable.just(number * 10, number * 100))
.subscribe(result -> System.out.println("ConcatMap: " + result));
// switchMap - cancels previous emissions
Observable.just(1, 2, 3)
.switchMap(number -> Observable.timer(100 * number, TimeUnit.MILLISECONDS)
.map(tick -> "SwitchMap: " + number))
.subscribe(result -> System.out.println(result));
// buffer - group items into lists
Observable.range(1, 10)
.buffer(3)
.subscribe(batch -> System.out.println("Buffer: " + batch));
}
private void filteringOperators() {
System.out.println("\n=== Filtering Operators ===");
// filter - emit only items that pass a predicate
Observable.range(1, 10)
.filter(number -> number % 2 == 0)
.subscribe(even -> System.out.println("Filter: " + even));
// take - take only first n items
Observable.interval(1, TimeUnit.SECONDS)
.take(3)
.subscribe(item -> System.out.println("Take: " + item));
// skip - skip first n items
Observable.range(1, 5)
.skip(2)
.subscribe(item -> System.out.println("Skip: " + item));
// distinct - remove duplicates
Observable.just(1, 2, 2, 3, 3, 3, 4)
.distinct()
.subscribe(unique -> System.out.println("Distinct: " + unique));
// debounce - emit only after a quiet period
Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(100);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(100);
emitter.onNext("D");
emitter.onComplete();
})
.debounce(200, TimeUnit.MILLISECONDS) // Wait for 200ms of silence
.subscribe(item -> System.out.println("Debounce: " + item));
}
private void combiningOperators() {
System.out.println("\n=== Combining Operators ===");
Observable<String> first = Observable.just("A", "B", "C");
Observable<String> second = Observable.just("1", "2", "3");
// merge - combine multiple Observables
Observable.merge(first, second)
.subscribe(item -> System.out.println("Merge: " + item));
// concat - combine sequentially
Observable.concat(first, second)
.subscribe(item -> System.out.println("Concat: " + item));
// zip - combine item-by-item
Observable.zip(first, second, (f, s) -> f + s)
.subscribe(combined -> System.out.println("Zip: " + combined));
// combineLatest - combine latest from each
Observable.combineLatest(
Observable.interval(1, TimeUnit.SECONDS).map(tick -> "Tick_" + tick),
Observable.interval(2, TimeUnit.SECONDS).map(tick -> "Tock_" + tick),
(tick, tock) -> tick + " | " + tock
)
.take(5)
.subscribe(combined -> System.out.println("CombineLatest: " + combined));
}
private void utilityOperators() {
System.out.println("\n=== Utility Operators ===");
// doOnNext - side effect for each emission
Observable.just(1, 2, 3)
.doOnNext(item -> System.out.println("About to emit: " + item))
.doOnSubscribe(d -> System.out.println("Subscribed"))
.doOnComplete(() -> System.out.println("Completed"))
.subscribe(item -> System.out.println("Emitted: " + item));
// delay - delay emissions
Observable.just("Delayed")
.delay(1, TimeUnit.SECONDS)
.subscribe(item -> System.out.println("Delay: " + item));
// timeout - error if no emission in time
Observable.timer(2, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("Timeout success"),
error -> System.out.println("Timeout error: " + error)
);
// retry - resubscribe on error
AtomicInteger attempts = new AtomicInteger();
Observable.create(emitter -> {
if (attempts.incrementAndGet() < 3) {
emitter.onError(new RuntimeException("Simulated error"));
} else {
emitter.onNext("Success after retries");
emitter.onComplete();
}
})
.retry(2) // Retry up to 2 times
.subscribe(
result -> System.out.println("Retry: " + result),
error -> System.out.println("Retry failed: " + error)
);
}
private void errorHandlingOperators() {
System.out.println("\n=== Error Handling Operators ===");
// onErrorReturn - return default value on error
Observable.error(new RuntimeException("Error!"))
.onErrorReturn(throwable -> "Default Value")
.subscribe(
item -> System.out.println("onErrorReturn: " + item),
error -> System.out.println("This should not be called")
);
// onErrorResumeNext - switch to another Observable on error
Observable.error(new RuntimeException("Error!"))
.onErrorResumeNext(throwable -> Observable.just("Recovered", "Values"))
.subscribe(item -> System.out.println("onErrorResumeNext: " + item));
// onErrorReturnItem - simpler version
Observable.error(new RuntimeException("Error!"))
.onErrorReturnItem("Fallback Item")
.subscribe(item -> System.out.println("onErrorReturnItem: " + item));
}
private void conditionalOperators() {
System.out.println("\n=== Conditional Operators ===");
// all - check if all items meet condition
Observable.just(2, 4, 6, 8)
.all(number -> number % 2 == 0)
.subscribe(result -> System.out.println("All even: " + result));
// any - check if any item meets condition
Observable.just(1, 3, 5, 8)
.any(number -> number % 2 == 0)
.subscribe(result -> System.out.println("Any even: " + result));
// contains - check if sequence contains item
Observable.just("Apple", "Banana", "Cherry")
.contains("Banana")
.subscribe(result -> System.out.println("Contains Banana: " + result));
// defaultIfEmpty - emit default if empty
Observable.empty()
.defaultIfEmpty("Default Value")
.subscribe(item -> System.out.println("defaultIfEmpty: " + item));
// sequenceEqual - check if two Observables emit same sequence
Observable<Integer> first = Observable.just(1, 2, 3);
Observable<Integer> second = Observable.just(1, 2, 3);
Observable.sequenceEqual(first, second)
.subscribe(equal -> System.out.println("Sequences equal: " + equal));
}
}

Schedulers and Threading

1. Scheduler Management

// SchedulerExample.java
public class SchedulerExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateSchedulers() {
basicSchedulers();
observeOnVsSubscribeOn();
androidSchedulers();
customSchedulers();
}
private void basicSchedulers() {
System.out.println("=== Basic Schedulers ===");
// Schedulers.io() - For I/O bound work
Disposable ioWork = Observable.fromCallable(() -> {
System.out.println("IO work on: " + Thread.currentThread().getName());
Thread.sleep(1000); // Simulate I/O operation
return "IO Result";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(result -> 
System.out.println("IO result on: " + Thread.currentThread().getName() + " - " + result)
);
// Schedulers.computation() - For CPU-intensive work
Disposable computationWork = Observable.range(1, 5)
.subscribeOn(Schedulers.computation())
.map(number -> {
System.out.println("Computation on: " + Thread.currentThread().getName());
return number * number;
})
.observeOn(Schedulers.newThread())
.subscribe(result -> 
System.out.println("Computation result on: " + Thread.currentThread().getName() + " - " + result)
);
disposables.addAll(ioWork, computationWork);
}
private void observeOnVsSubscribeOn() {
System.out.println("\n=== observeOn vs subscribeOn ===");
Disposable example = Observable.create(emitter -> {
System.out.println("Emit on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
})
.subscribeOn(Schedulers.io()) // Affects upstream operations
.observeOn(Schedulers.computation()) // Affects downstream operations
.map(item -> {
System.out.println("Map on: " + Thread.currentThread().getName());
return item * 10;
})
.observeOn(Schedulers.single())
.subscribe(item -> 
System.out.println("Receive on: " + Thread.currentThread().getName() + " - " + item)
);
disposables.add(example);
}
private void androidSchedulers() {
System.out.println("\n=== Android Schedulers ===");
// Note: In Android, you'd use AndroidSchedulers.mainThread()
// from RxAndroid library
// Simulating Android main thread with Schedulers.single()
Disposable androidExample = Observable.just("Network Result")
.subscribeOn(Schedulers.io()) // Network call on IO thread
.observeOn(Schedulers.single()) // Simulate Android main thread
.doOnNext(result -> 
System.out.println("Updating UI on: " + Thread.currentThread().getName())
)
.subscribe(result -> 
System.out.println("UI updated with: " + result)
);
disposables.add(androidExample);
}
private void customSchedulers() {
System.out.println("\n=== Custom Schedulers ===");
// Create custom scheduler from executor
ExecutorService customExecutor = Executors.newFixedThreadPool(3);
Scheduler customScheduler = Schedulers.from(customExecutor);
Disposable customWork = Observable.range(1, 6)
.subscribeOn(customScheduler)
.map(item -> {
System.out.println("Custom scheduler: " + Thread.currentThread().getName());
return item;
})
.subscribe();
disposables.add(customWork);
// Shutdown executor when done
disposables.add(Disposable.fromRunnable(() -> {
customExecutor.shutdown();
System.out.println("Custom executor shutdown");
}));
}
public void cleanup() {
disposables.dispose();
}
}

2. Threading Best Practices

// ThreadingBestPractices.java
public class ThreadingBestPractices {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateBestPractices() {
avoidBlockingOperations();
properSchedulerUsage();
memoryLeakPrevention();
errorHandlingInThreads();
}
private void avoidBlockingOperations() {
System.out.println("=== Avoid Blocking Operations ===");
// ❌ Bad: Blocking operation on computation scheduler
Disposable badExample = Observable.just("data")
.subscribeOn(Schedulers.computation())
.map(data -> {
try {
Thread.sleep(2000); // Blocking operation!
} catch (InterruptedException e) {
e.printStackTrace();
}
return data.toUpperCase();
})
.subscribe(result -> System.out.println("Bad result: " + result));
// ✅ Good: Use proper scheduler for blocking operations
Disposable goodExample = Observable.just("data")
.subscribeOn(Schedulers.io()) // Use IO scheduler for blocking ops
.map(data -> {
try {
Thread.sleep(2000); // This is okay on IO scheduler
} catch (InterruptedException e) {
e.printStackTrace();
}
return data.toUpperCase();
})
.observeOn(Schedulers.computation()) // Switch for computation
.subscribe(result -> System.out.println("Good result: " + result));
disposables.addAll(badExample, goodExample);
}
private void properSchedulerUsage() {
System.out.println("\n=== Proper Scheduler Usage ===");
Disposable example = Observable.range(1, 100)
.subscribeOn(Schedulers.io()) // Heavy initial work on IO
.filter(number -> number % 2 == 0)
.observeOn(Schedulers.computation()) // Computation for CPU work
.map(number -> number * number)
.buffer(10)
.observeOn(Schedulers.single()) // Single thread for final processing
.subscribe(
batch -> System.out.println("Batch on " + Thread.currentThread().getName() + ": " + batch),
error -> System.err.println("Error: " + error),
() -> System.out.println("Processing completed")
);
disposables.add(example);
}
private void memoryLeakPrevention() {
System.out.println("\n=== Memory Leak Prevention ===");
// Using CompositeDisposable for easy cleanup
CompositeDisposable localDisposables = new CompositeDisposable();
// Multiple subscriptions
Disposable sub1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Sub1: " + tick));
Disposable sub2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(tick -> System.out.println("Sub2: " + tick));
localDisposables.addAll(sub1, sub2);
// Simulate activity/fragment destruction
System.out.println("Cleaning up local disposables...");
localDisposables.dispose();
}
private void errorHandlingInThreads() {
System.out.println("\n=== Error Handling in Threads ===");
Disposable example = Observable.create(emitter -> {
// This runs on IO scheduler
if (Math.random() > 0.5) {
emitter.onError(new RuntimeException("Random error!"));
} else {
emitter.onNext("Success");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
result -> System.out.println("Success: " + result),
error -> System.err.println("Error handled on: " + 
Thread.currentThread().getName() + " - " + error.getMessage())
);
disposables.add(example);
}
public void cleanup() {
disposables.dispose();
}
}

Android-specific RxJava

1. RxJava in Android Activities

// MainActivity.java
public class MainActivity extends AppCompatActivity {
private CompositeDisposable disposables = new CompositeDisposable();
private TextView resultTextView;
private ProgressBar progressBar;
private EditText searchEditText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initializeViews();
setupClickListeners();
setupSearchListener();
loadInitialData();
}
private void initializeViews() {
resultTextView = findViewById(R.id.resultTextView);
progressBar = findViewById(R.id.progressBar);
searchEditText = findViewById(R.id.searchEditText);
}
private void setupClickListeners() {
Button loadButton = findViewById(R.id.loadButton);
Button parallelButton = findViewById(R.id.parallelButton);
// Single network call
disposables.add(
RxView.clicks(loadButton)
.throttleFirst(1, TimeUnit.SECONDS) // Prevent multiple clicks
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(click -> showLoading(true))
.observeOn(Schedulers.io())
.flatMapSingle(click -> performNetworkCall())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> {
showLoading(false);
resultTextView.setText("Result: " + result);
},
error -> {
showLoading(false);
resultTextView.setText("Error: " + error.getMessage());
}
)
);
// Parallel network calls
disposables.add(
RxView.clicks(parallelButton)
.throttleFirst(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(click -> showLoading(true))
.observeOn(Schedulers.io())
.flatMap(click -> Observable.merge(
performNetworkCall("Call 1").toObservable(),
performNetworkCall("Call 2").toObservable(),
performNetworkCall("Call 3").toObservable()
))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> {
resultTextView.append("\nParallel: " + result);
},
error -> {
showLoading(false);
resultTextView.setText("Parallel Error: " + error.getMessage());
},
() -> {
showLoading(false);
resultTextView.append("\nAll parallel calls completed");
}
)
);
}
private void setupSearchListener() {
// Real-time search with debounce
disposables.add(
RxTextView.textChanges(searchEditText)
.debounce(300, TimeUnit.MILLISECONDS) // Wait for user to stop typing
.filter(text -> text.length() > 2) // Only search if more than 2 chars
.distinctUntilChanged() // Only if text changed
.switchMapSingle(query -> performSearch(query.toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> resultTextView.setText("Search results: " + results),
error -> resultTextView.setText("Search error: " + error.getMessage())
)
);
}
private void loadInitialData() {
// Load data when activity starts
disposables.add(
performNetworkCall("Initial Data")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> resultTextView.setText("Initial: " + result),
error -> resultTextView.setText("Initial load error: " + error.getMessage())
)
);
}
private Single<String> performNetworkCall() {
return performNetworkCall("Default");
}
private Single<String> performNetworkCall(String callName) {
return Single.fromCallable(() -> {
// Simulate network call
Thread.sleep(2000);
if (Math.random() > 0.8) {
throw new RuntimeException(callName + " failed!");
}
return callName + " completed at " + System.currentTimeMillis();
});
}
private Single<String> performSearch(String query) {
return Single.fromCallable(() -> {
// Simulate search API call
Thread.sleep(500);
return "Results for '" + query + "': " + (int)(Math.random() * 100);
}).subscribeOn(Schedulers.io());
}
private void showLoading(boolean show) {
progressBar.setVisibility(show ? View.VISIBLE : View.GONE);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.dispose(); // Prevent memory leaks
}
}

2. RxBinding for Android Views

// RxBindingExample.java
public class RxBindingExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void setupViewBindings(Activity activity) {
setupButtonClicks(activity);
setupTextChanges(activity);
setupCheckboxChanges(activity);
setupRecyclerViewScroll(activity);
setupMultipleViews(activity);
}
private void setupButtonClicks(Activity activity) {
Button button = activity.findViewById(R.id.someButton);
disposables.add(
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS) // Prevent double clicks
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
// Handle button click
showToast(activity, "Button clicked!");
})
);
// Long clicks
disposables.add(
RxView.longClicks(button)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
showToast(activity, "Button long clicked!");
})
);
}
private void setupTextChanges(Activity activity) {
EditText editText = activity.findViewById(R.id.someEditText);
disposables.add(
RxTextView.textChanges(editText)
.debounce(300, TimeUnit.MILLISECONDS) // Wait for typing to stop
.filter(text -> text.length() > 0) // Ignore empty text
.distinctUntilChanged() // Only emit when text actually changes
.observeOn(AndroidSchedulers.mainThread())
.subscribe(text -> {
// Handle text changes
updateSearchResults(activity, text.toString());
})
);
// Focus changes
disposables.add(
RxView.focusChanges(editText)
.skip(1) // Skip initial emission
.observeOn(AndroidSchedulers.mainThread())
.subscribe(hasFocus -> {
if (hasFocus) {
// EditText gained focus
showToast(activity, "EditText focused");
} else {
// EditText lost focus
showToast(activity, "EditText lost focus");
}
})
);
}
private void setupCheckboxChanges(Activity activity) {
CheckBox checkBox = activity.findViewById(R.id.someCheckBox);
disposables.add(
RxCompoundButton.checkedChanges(checkBox)
.skip(1) // Skip initial value
.observeOn(AndroidSchedulers.mainThread())
.subscribe(isChecked -> {
// Handle checkbox state change
String state = isChecked ? "checked" : "unchecked";
showToast(activity, "Checkbox " + state);
})
);
}
private void setupRecyclerViewScroll(Activity activity) {
RecyclerView recyclerView = activity.findViewById(R.id.someRecyclerView);
disposables.add(
RxRecyclerView.scrollStateChanges(recyclerView)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(state -> {
switch (state) {
case RecyclerView.SCROLL_STATE_DRAGGING:
// User is dragging
break;
case RecyclerView.SCROLL_STATE_SETTLING:
// Scroll is settling
break;
case RecyclerView.SCROLL_STATE_IDLE:
// Scroll is idle
checkLoadMore(activity);
break;
}
})
);
// Scroll events with more details
disposables.add(
RxRecyclerView.scrollEvents(recyclerView)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(event -> {
// Handle scroll events with more details
int dx = event.dx();
int dy = event.dy();
// Process scroll data...
})
);
}
private void setupMultipleViews(Activity activity) {
Button button1 = activity.findViewById(R.id.button1);
Button button2 = activity.findViewById(R.id.button2);
Button button3 = activity.findViewById(R.id.button3);
// Combine multiple button clicks
Observable<Object> button1Clicks = RxView.clicks(button1);
Observable<Object> button2Clicks = RxView.clicks(button2);
Observable<Object> button3Clicks = RxView.clicks(button3);
disposables.add(
Observable.merge(button1Clicks, button2Clicks, button3Clicks)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
// Handle any button click
showToast(activity, "A button was clicked!");
})
);
// Combine specific conditions
EditText emailEditText = activity.findViewById(R.id.emailEditText);
EditText passwordEditText = activity.findViewById(R.id.passwordEditText);
Button submitButton = activity.findViewById(R.id.submitButton);
disposables.add(
Observable.combineLatest(
RxTextView.textChanges(emailEditText),
RxTextView.textChanges(passwordEditText),
(email, password) -> {
// Validate form
boolean emailValid = isValidEmail(email.toString());
boolean passwordValid = password.length() >= 6;
return emailValid && passwordValid;
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(isValid -> {
// Enable/disable submit button based on validation
submitButton.setEnabled(isValid);
})
);
}
private void updateSearchResults(Activity activity, String query) {
// Implement search logic
TextView resultsView = activity.findViewById(R.id.resultsTextView);
resultsView.setText("Searching for: " + query);
// You would typically make an API call here
}
private void checkLoadMore(Activity activity) {
// Implement load more logic for RecyclerView
RecyclerView recyclerView = activity.findViewById(R.id.someRecyclerView);
LinearLayoutManager layoutManager = (LinearLayoutManager) recyclerView.getLayoutManager();
if (layoutManager != null) {
int visibleItemCount = layoutManager.getChildCount();
int totalItemCount = layoutManager.getItemCount();
int firstVisibleItemPosition = layoutManager.findFirstVisibleItemPosition();
if ((visibleItemCount + firstVisibleItemPosition) >= totalItemCount
&& firstVisibleItemPosition >= 0) {
// Load more data
loadMoreData(activity);
}
}
}
private void loadMoreData(Activity activity) {
// Implement load more data logic
showToast(activity, "Loading more data...");
}
private boolean isValidEmail(String email) {
return email != null && android.util.Patterns.EMAIL_ADDRESS.matcher(email).matches();
}
private void showToast(Activity activity, String message) {
Toast.makeText(activity, message, Toast.LENGTH_SHORT).show();
}
public void cleanup() {
disposables.dispose();
}
}

Networking with RxJava

1. Retrofit with RxJava

// ApiService.java
public interface ApiService {
@GET("users/{userId}")
Single<User> getUser(@Path("userId") String userId);
@GET("users")
Single<List<User>> getUsers();
@POST("users")
Single<User> createUser(@Body User user);
@GET("posts")
Single<List<Post>> getUserPosts(@Query("userId") String userId);
@GET("users/{userId}/posts")
Single<List<Post>> getUserPostsWithPath(@Path("userId") String userId);
@Multipart
@POST("upload")
Single<UploadResponse> uploadFile(
@Part MultipartBody.Part file,
@Part("description") RequestBody description
);
@Streaming
@GET("files/{fileName}")
Single<ResponseBody> downloadFile(@Path("fileName") String fileName);
}
// User.java
public class User {
private String id;
private String name;
private String email;
private String avatar;
// Constructors, getters, setters
public User() {}
public User(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getters and setters...
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getAvatar() { return avatar; }
public void setAvatar(String avatar) { this.avatar = avatar; }
}
// Post.java
public class Post {
private String id;
private String userId;
private String title;
private String body;
// Constructors, getters, setters...
public Post() {}
public Post(String id, String userId, String title, String body) {
this.id = id;
this.userId = userId;
this.title = title;
this.body = body;
}
// Getters and setters...
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getBody() { return body; }
public void setBody(String body) { this.body = body; }
}

2. Network Repository Pattern

// UserRepository.java
public class UserRepository {
private final ApiService apiService;
private final CompositeDisposable disposables = new CompositeDisposable();
public UserRepository(ApiService apiService) {
this.apiService = apiService;
}
// Basic CRUD operations
public Single<User> getUser(String userId) {
return apiService.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Single<List<User>> getUsers() {
return apiService.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Single<User> createUser(User user) {
return apiService.createUser(user)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Complex operations
public Single<User> getUserWithPosts(String userId) {
return apiService.getUser(userId)
.flatMap(user -> 
apiService.getUserPosts(userId)
.map(posts -> {
user.setPosts(posts); // Assuming User has posts field
return user;
})
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Parallel operations
public Single<List<User>> getUsersWithPosts() {
return apiService.getUsers()
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(user -> 
apiService.getUserPosts(user.getId())
.map(posts -> {
user.setPosts(posts);
return user;
})
.onErrorReturnItem(user) // Return user even if posts fail
)
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Batch operations
public Completable updateMultipleUsers(List<User> users) {
return Observable.fromIterable(users)
.flatMapCompletable(user -> 
apiService.createUser(user)
.ignoreElement() // Convert Single to Completable
.onErrorComplete() // Continue on error
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Cached network calls
public Observable<User> getUserWithCache(String userId) {
return Observable.concat(
getCachedUser(userId).toObservable(),
apiService.getUser(userId)
.doOnSuccess(this::cacheUser)
.toObservable()
)
.firstElement() // Take first non-empty emission
.toObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
private Single<User> getCachedUser(String userId) {
// Implement cache retrieval
return Single.error(new RuntimeException("Not in cache")); // Simplified
}
private void cacheUser(User user) {
// Implement cache storage
}
// Error handling with retry
public Single<User> getUserWithRetry(String userId) {
return apiService.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(errors -> 
errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
if (retryCount < 3 && isNetworkError(error)) {
return retryCount;
}
throw Exceptions.propagate(error);
})
.flatMap(retryCount -> 
Observable.timer((long) Math.pow(2, retryCount), TimeUnit.SECONDS)
)
);
}
private boolean isNetworkError(Throwable error) {
return error instanceof IOException || 
error instanceof SocketTimeoutException ||
error instanceof ConnectException;
}
public void cleanup() {
disposables.dispose();
}
}

3. Network ViewModel

// UserViewModel.java
public class UserViewModel extends ViewModel {
private final UserRepository userRepository;
private final CompositeDisposable disposables = new CompositeDisposable();
// LiveData for UI observation
private final MutableLiveData<List<User>> usersLiveData = new MutableLiveData<>();
private final MutableLiveData<User> userLiveData = new MutableLiveData<>();
private final MutableLiveData<Boolean> loadingLiveData = new MutableLiveData<>();
private final MutableLiveData<String> errorLiveData = new MutableLiveData<>();
public UserViewModel(UserRepository userRepository) {
this.userRepository = userRepository;
}
public void loadUsers() {
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUsers()
.subscribe(
users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
errorLiveData.setValue(null);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to load users: " + error.getMessage());
}
)
);
}
public void loadUser(String userId) {
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUserWithPosts(userId)
.subscribe(
user -> {
loadingLiveData.setValue(false);
userLiveData.setValue(user);
errorLiveData.setValue(null);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to load user: " + error.getMessage());
}
)
);
}
public void createUser(User user) {
loadingLiveData.setValue(true);
disposables.add(
userRepository.createUser(user)
.subscribe(
createdUser -> {
loadingLiveData.setValue(false);
// Update users list or navigate
loadUsers(); // Refresh the list
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to create user: " + error.getMessage());
}
)
);
}
public void searchUsers(String query) {
if (query == null || query.trim().isEmpty()) {
loadUsers();
return;
}
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUsers()
.flatMapObservable(Observable::fromIterable)
.filter(user -> 
user.getName().toLowerCase().contains(query.toLowerCase()) ||
user.getEmail().toLowerCase().contains(query.toLowerCase())
)
.toList()
.subscribe(
filteredUsers -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(filteredUsers);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Search failed: " + error.getMessage());
}
)
);
}
// LiveData getters
public LiveData<List<User>> getUsersLiveData() {
return usersLiveData;
}
public LiveData<User> getUserLiveData() {
return userLiveData;
}
public LiveData<Boolean> getLoadingLiveData() {
return loadingLiveData;
}
public LiveData<String> getErrorLiveData() {
return errorLiveData;
}
@Override
protected void onCleared() {
super.onCleared();
disposables.dispose();
}
}

Database Operations

1. Room Database with RxJava

// UserDao.java
@Dao
public interface UserDao {
@Insert
Completable insert(User user);
@Insert
Completable insertAll(List<User> users);
@Update
Completable update(User user);
@Delete
Completable delete(User user);
@Query("SELECT * FROM users")
Flowable<List<User>> getAllUsers();
@Query("SELECT * FROM users WHERE id = :userId")
Maybe<User> getUserById(String userId);
@Query("SELECT * FROM users WHERE name LIKE :searchQuery")
Flowable<List<User>> searchUsers(String searchQuery);
@Query("SELECT * FROM users ORDER BY name ASC")
Flowable<List<User>> getUsersSortedByName();
@Query("DELETE FROM users")
Completable deleteAllUsers();
}
// AppDatabase.java
@Database(entities = {User.class, Post.class}, version = 1)
public abstract class AppDatabase extends RoomDatabase {
public abstract UserDao userDao();
public abstract PostDao postDao();
}
// LocalUserRepository.java
public class LocalUserRepository {
private final UserDao userDao;
private final CompositeDisposable disposables = new CompositeDisposable();
public LocalUserRepository(UserDao userDao) {
this.userDao = userDao;
}
// Basic operations
public Completable insertUser(User user) {
return userDao.insert(user)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Completable insertUsers(List<User> users) {
return userDao.insertAll(users)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Flowable<List<User>> getAllUsers() {
return userDao.getAllUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Maybe<User> getUserById(String userId) {
return userDao.getUserById(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Complex operations
public Completable saveUserWithPosts(User user, List<Post> posts) {
return Completable.mergeArray(
userDao.insert(user),
// Assuming you have a PostDao with insertAll method
Completable.fromAction(() -> {
// Save posts logic
})
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Flowable<List<User>> searchUsers(String query) {
return userDao.searchUsers("%" + query + "%")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorReturnItem(Collections.emptyList()); // Return empty list on error
}
// Sync operations
public Completable syncUsers(List<User> users) {
return userDao.deleteAllUsers()
.andThen(userDao.insertAll(users))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public void cleanup() {
disposables.dispose();
}
}

Error Handling

1. Comprehensive Error Handling

// ErrorHandlingExample.java
public class ErrorHandlingExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateErrorHandling() {
basicErrorHandling();
retryStrategies();
errorRecovery();
globalErrorHandler();
}
private void basicErrorHandling() {
System.out.println("=== Basic Error Handling ===");
Observable<String> errorObservable = Observable.create(emitter -> {
emitter.onNext("First item");
emitter.onError(new RuntimeException("Something went wrong!"));
});
disposables.add(
errorObservable
.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error handled: " + error.getMessage()),
() -> System.out.println("Completed") // This won't be called
)
);
}
private void retryStrategies() {
System.out.println("\n=== Retry Strategies ===");
AtomicInteger attempts = new AtomicInteger();
Observable<String> flakyObservable = Observable.create(emitter -> {
int attempt = attempts.incrementAndGet();
if (attempt < 3) {
emitter.onError(new RuntimeException("Failed on attempt " + attempt));
} else {
emitter.onNext("Success on attempt " + attempt);
emitter.onComplete();
}
});
// Simple retry
disposables.add(
flakyObservable
.retry(2) // Retry up to 2 times
.subscribe(
result -> System.out.println("Simple retry: " + result),
error -> System.err.println("Simple retry failed: " + error.getMessage())
)
);
// Retry with delay
disposables.add(
flakyObservable
.retryWhen(errors -> 
errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
if (retryCount > 3) {
throw Exceptions.propagate(error);
}
return retryCount;
})
.flatMap(retryCount -> 
Observable.timer(retryCount, TimeUnit.SECONDS)
)
)
.subscribe(
result -> System.out.println("Delayed retry: " + result),
error -> System.err.println("Delayed retry failed: " + error.getMessage())
)
);
}
private void errorRecovery() {
System.out.println("\n=== Error Recovery ===");
Observable<String> unreliableObservable = Observable.create(emitter -> {
if (Math.random() > 0.5) {
emitter.onNext("Success");
emitter.onComplete();
} else {
emitter.onError(new RuntimeException("Random failure"));
}
});
// Return default value on error
disposables.add(
unreliableObservable
.onErrorReturnItem("Default Value")
.subscribe(result -> System.out.println("Error return: " + result))
);
// Switch to fallback Observable
disposables.add(
unreliableObservable
.onErrorResumeNext(throwable -> 
Observable.just("Fallback", "Values")
)
.subscribe(result -> System.out.println("Error resume: " + result))
);
// Continue with next item
Observable<String> source = Observable.just("First", "Second", "Third")
.flatMap(item -> {
if ("Second".equals(item)) {
return Observable.error(new RuntimeException("Failed on second"));
}
return Observable.just(item);
});
disposables.add(
source
.onErrorResumeNext(Observable.empty()) // Continue with empty
.subscribe(
item -> System.out.println("Continue: " + item),
error -> System.err.println("This shouldn't be called"),
() -> System.out.println("Completed despite errors")
)
);
}
private void globalErrorHandler() {
System.out.println("\n=== Global Error Handler ===");
RxJavaPlugins.setErrorHandler(throwable -> {
// Global error handler for uncaught exceptions
System.err.println("Global error handler: " + throwable.getMessage());
// You could log to Crashlytics, show notification, etc.
});
// This error won't crash the app due to global handler
Observable.error(new RuntimeException("Unhandled error"))
.subscribe();
}
public void cleanup() {
disposables.dispose();
}
}

Testing RxJava

1. RxJava Testing

// RxJavaTestExample.java
public class RxJavaTestExample {
private TestScheduler testScheduler;
@Before
public void setup() {
testScheduler = new TestScheduler();
}
@Test
public void testBasicObservable() {
// Given
Observable<String> observable = Observable.just("Hello", "World");
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValues("Hello", "World");
testObserver.assertComplete();
testObserver.assertNoErrors();
}
@Test
public void testErrorHandling() {
// Given
Observable<String> observable = Observable.error(new RuntimeException("Test error"));
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertError(RuntimeException.class);
testObserver.assertErrorMessage("Test error");
}
@Test
public void testTimeBasedObservable() {
// Given
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.take(3);
TestObserver<Long> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then - no values yet
testObserver.assertNoValues();
// Advance time
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testObserver.assertValues(0L, 1L);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValues(0L, 1L, 2L);
testObserver.assertComplete();
}
@Test
public void testAsyncOperation() {
// Given
Observable<String> observable = Observable.fromCallable(() -> {
Thread.sleep(1000); // Simulate async work
return "Result";
}).subscribeOn(Schedulers.trampoline()); // Use trampoline for testing
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValue("Result");
testObserver.assertComplete();
}
@Test
public void testOperatorChain() {
// Given
Observable<Integer> observable = Observable.range(1, 5)
.filter(number -> number % 2 == 0)
.map(number -> number * 10);
TestObserver<Integer> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValues(20, 40);
testObserver.assertComplete();
}
}

Best Practices

1. RxJava Best Practices

// RxJavaBestPractices.java
public class RxJavaBestPractices {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateBestPractices() {
useCompositeDisposable();
avoidMemoryLeaks();
properErrorHandling();
useAppropriateSchedulers();
avoidOverusingRxJava();
useDisposableMaybe();
}
private void useCompositeDisposable() {
System.out.println("=== Use CompositeDisposable ===");
CompositeDisposable localDisposables = new CompositeDisposable();
// Add multiple disposables
localDisposables.add(
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Tick: " + tick))
);
localDisposables.add(
Observable.just("Data")
.subscribe(data -> System.out.println("Data: " + data))
);
// Clean up all at once
localDisposables.dispose();
}
private void avoidMemoryLeaks() {
System.out.println("\n=== Avoid Memory Leaks ===");
// ❌ Bad: Never disposing subscriptions
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Leaky: " + tick));
// ✅ Good: Proper disposal
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Safe: " + tick));
disposables.add(disposable);
}
private void properErrorHandling() {
System.out.println("\n=== Proper Error Handling ===");
// ❌ Bad: Ignoring errors
Observable.error(new RuntimeException("Error"))
.subscribe(
item -> System.out.println("Received: " + item)
// Missing error handler!
);
// ✅ Good: Always handle errors
Observable.error(new RuntimeException("Error"))
.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error handled: " + error.getMessage())
);
// Even better: Use onErrorReturn or onErrorResumeNext
Observable.error(new RuntimeException("Error"))
.onErrorReturnItem("Fallback")
.subscribe(item -> System.out.println("Result: " + item));
}
private void useAppropriateSchedulers() {
System.out.println("\n=== Use Appropriate Schedulers ===");
// I/O operations
disposables.add(
Observable.fromCallable(() -> {
// Database or file operation
return "I/O Result";
})
.subscribeOn(Schedulers.io())
.subscribe()
);
// Computation operations
disposables.add(
Observable.range(1, 1000)
.subscribeOn(Schedulers.computation())
.map(number -> number * number) // CPU-intensive
.subscribe()
);
// UI updates (in Android)
disposables.add(
Observable.just("Data")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// Update UI
System.out.println("UI update: " + data);
})
);
}
private void avoidOverusingRxJava() {
System.out.println("\n=== Avoid Overusing RxJava ===");
// ❌ Bad: Using RxJava for simple synchronous operations
Observable.just("Hello")
.map(String::toUpperCase)
.subscribe(result -> System.out.println("Overkill: " + result));
// ✅ Good: Use regular Java for simple operations
String result = "Hello".toUpperCase();
System.out.println("Simple: " + result);
// Use RxJava when you need its benefits:
// - Asynchronous operations
// - Complex data transformations
// - Combining multiple data sources
// - Error handling across async boundaries
}
private void useDisposableMaybe() {
System.out.println("\n=== Use Disposable Maybe ===");
// For operations that might not complete immediately
Maybe<String> maybeOperation = Maybe.create(emitter -> {
if (Math.random() > 0.5) {
emitter.onSuccess("Success");
} else {
emitter.onComplete(); // No result, but no error either
}
});
disposables.add(
maybeOperation
.subscribe(
result -> System.out.println("Maybe success: " + result),
error -> System.err.println("Maybe error: " + error),
() -> System.out.println("Maybe completed without result")
)
);
}
// Additional best practices
public void additionalBestPractices() {
useMeaningfulVariableNames();
avoidNestedSubscriptions();
useOperatorsEffectively();
testYourRxCode();
}
private void useMeaningfulVariableNames() {
// ❌ Bad: Unclear variable names
Observable<String> o = Observable.just("data");
Disposable d = o.subscribe();
// ✅ Good: Descriptive names
Observable<String> userDataStream = Observable.just("user data");
Disposable userSubscription = userDataStream.subscribe();
disposables.add(userSubscription);
}
private void avoidNestedSubscriptions() {
// ❌ Bad: Nested subscriptions (callback hell in RxJava)
Observable.just("outer")
.subscribe(outer -> {
Observable.just("inner")
.subscribe(inner -> System.out.println(outer + " - " + inner));
});
// ✅ Good: Use flatMap or other operators
Observable.just("outer")
.flatMap(outer -> 
Observable.just("inner")
.map(inner -> outer + " - " + inner)
)
.subscribe(result -> System.out.println(result));
}
private void useOperatorsEffectively() {
// Chain operators for better readability
disposables.add(
Observable.range(1, 100)
.filter(number -> number % 2 == 0)    // Filter even numbers
.map(number -> number * 2)            // Double them
.take(10)                            // Take first 10
.reduce(0, Integer::sum)             // Sum them up
.subscribe(total -> System.out.println("Total: " + total))
);
}
private void testYourRxCode() {
// Always test your RxJava code, especially:
// - Async operations
// - Error handling
// - Operator chains
// - Threading behavior
}
public void cleanup() {
disposables.dispose();
}
}

Conclusion

RxJava is a powerful tool for Android development that can greatly simplify asynchronous programming and data stream handling. Key takeaways:

  1. Use appropriate schedulers for different types of work
  2. Always handle errors and use proper error recovery strategies
  3. Prevent memory leaks with CompositeDisposable and proper lifecycle management
  4. Choose the right Observable type (Single, Maybe, Completable) for your use case
  5. Use operators effectively to transform, filter, and combine data streams
  6. Test your RxJava code thoroughly, especially async operations
  7. Avoid overusing RxJava for simple synchronous operations
  8. Follow reactive programming principles for clean, maintainable code

By following these best practices and patterns, you can build robust, responsive Android applications that handle complex asynchronous operations with ease.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper