Table of Contents
- Introduction to RxJava
- RxJava Basics and Setup
- Observables and Observers
- Operators and Transformation
- Schedulers and Threading
- Android-specific RxJava
- Networking with RxJava
- Database Operations
- Error Handling
- Testing RxJava
- Best Practices
Introduction to RxJava
RxJava is a Java implementation of Reactive Extensions for composing asynchronous and event-based programs using observable sequences. It greatly simplifies asynchronous programming in Android applications.
Key Benefits for Android:
- Simplified async operations: Handle background tasks elegantly
- Functional programming: Use operators to transform, combine, and manipulate data
- Error handling: Comprehensive error handling mechanisms
- Thread management: Easy switching between threads
- Memory leak prevention: Built-in lifecycle awareness with RxAndroid
RxJava Basics and Setup
1. Gradle Dependencies
// build.gradle (Module: app)
dependencies {
// RxJava 3
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
// RxBinding for Android Views
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
// RxJava Adapter for Retrofit
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'
// RxPermissions for runtime permissions
implementation 'com.github.tbruyelle:rxpermissions:0.12'
// Testing
testImplementation 'io.reactivex.rxjava3:rxjava:3.1.8'
testImplementation 'junit:junit:4.13.2'
androidTestImplementation 'androidx.test.ext:junit:1.1.5'
}
2. Basic RxJava Components
// RxJavaBasicExample.java
public class RxJavaBasicExample {
// 1. Observable - The data source that emits items
// 2. Observer - The consumer that receives items
// 3. Operators - Methods that transform, filter, or combine Observables
// 4. Schedulers - Control which thread operations occur on
public static void main(String[] args) {
basicObservableExample();
operatorsExample();
threadingExample();
}
private static void basicObservableExample() {
System.out.println("=== Basic Observable Example ===");
// Create an Observable that emits a sequence of integers
Observable<Integer> observable = Observable.create(emitter -> {
try {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
// Create an Observer to consume the emitted items
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Received: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
// Subscribe the observer to the observable
observable.subscribe(observer);
}
private static void operatorsExample() {
System.out.println("\n=== Operators Example ===");
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(number -> number % 2 == 0) // Filter even numbers
.map(number -> number * number) // Square each number
.take(3) // Take only first 3
.subscribe(
result -> System.out.println("Processed: " + result),
error -> System.out.println("Error: " + error),
() -> System.out.println("Operation completed")
);
}
private static void threadingExample() {
System.out.println("\n=== Threading Example ===");
Observable.just("Hello", "World")
.subscribeOn(Schedulers.io()) // Execute on IO thread
.observeOn(Schedulers.computation()) // Observe on computation thread
.doOnNext(item -> System.out.println("Processing on: " + Thread.currentThread().getName()))
.observeOn(Schedulers.newThread()) // Switch to new thread
.subscribe(
item -> System.out.println("Received on: " + Thread.currentThread().getName() + " - " + item)
);
// Sleep to allow async operations to complete
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Observables and Observers
1. Different Types of Observables
// ObservableTypesExample.java
public class ObservableTypesExample {
public void demonstrateObservableTypes() {
createObservable();
justObservable();
fromObservable();
rangeObservable();
intervalObservable();
deferObservable();
}
private void createObservable() {
System.out.println("=== Observable.create() ===");
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("First item");
emitter.onNext("Second item");
emitter.onNext("Third item");
emitter.onComplete();
});
observable.subscribe(
item -> System.out.println("Received: " + item),
error -> System.out.println("Error: " + error),
() -> System.out.println("Completed")
);
}
private void justObservable() {
System.out.println("\n=== Observable.just() ===");
// Emits fixed set of items
Observable.just("Apple", "Banana", "Cherry")
.subscribe(item -> System.out.println("Fruit: " + item));
}
private void fromObservable() {
System.out.println("\n=== Observable.fromXXX() ===");
// From Iterable
List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
.subscribe(item -> System.out.println("From list: " + item));
// From Array
String[] array = {"A", "B", "C"};
Observable.fromArray(array)
.subscribe(item -> System.out.println("From array: " + item));
// From Callable
Observable.fromCallable(() -> "Result from callable")
.subscribe(item -> System.out.println("From callable: " + item));
}
private void rangeObservable() {
System.out.println("\n=== Observable.range() ===");
Observable.range(1, 5) // Start from 1, emit 5 numbers
.subscribe(number -> System.out.println("Number: " + number));
}
private void intervalObservable() {
System.out.println("\n=== Observable.interval() ===");
// Emits numbers at fixed time intervals
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.take(5) // Take only 5 emissions
.subscribe(
tick -> System.out.println("Tick: " + tick),
error -> System.out.println("Error: " + error),
() -> System.out.println("Interval completed")
);
// In real Android app, you'd dispose in onDestroy()
try {
Thread.sleep(6000); // Wait for completion
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
}
private void deferObservable() {
System.out.println("\n=== Observable.defer() ===");
// Defer creation until subscription
Observable<String> deferred = Observable.defer(() ->
Observable.just("Current time: " + System.currentTimeMillis())
);
// Each subscription gets fresh data
deferred.subscribe(time -> System.out.println("First: " + time));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
deferred.subscribe(time -> System.out.println("Second: " + time));
}
// Special Observable types
public void specialObservables() {
System.out.println("\n=== Special Observables ===");
// Single - emits exactly one item or error
Single.just("Single result")
.subscribe(
result -> System.out.println("Single: " + result),
error -> System.out.println("Single error: " + error)
);
// Maybe - emits 0 or 1 item, optionally followed by completion
Maybe.just("Maybe result")
.subscribe(
result -> System.out.println("Maybe: " + result),
error -> System.out.println("Maybe error: " + error),
() -> System.out.println("Maybe completed")
);
// Completable - emits only completion or error signal
Completable.complete()
.subscribe(
() -> System.out.println("Completable completed"),
error -> System.out.println("Completable error: " + error)
);
}
}
2. Observer Patterns and Disposables
// ObserverPatternsExample.java
public class ObserverPatternsExample {
private CompositeDisposable compositeDisposable = new CompositeDisposable();
public void demonstrateObserverPatterns() {
basicObserver();
resourceManagement();
hotVsColdObservables();
backpressureExample();
}
private void basicObserver() {
System.out.println("=== Basic Observer Patterns ===");
// Using lambda expressions for cleaner code
Disposable disposable = Observable.range(1, 3)
.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
compositeDisposable.add(disposable);
}
private void resourceManagement() {
System.out.println("\n=== Resource Management ===");
// Using Disposable for resource cleanup
Disposable disposable1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Tick 1: " + tick));
Disposable disposable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(tick -> System.out.println("Tick 2: " + tick));
// Add to composite disposable for easy cleanup
compositeDisposable.addAll(disposable1, disposable2);
// Later, dispose all at once
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Disposing all subscriptions...");
compositeDisposable.clear();
}
private void hotVsColdObservables() {
System.out.println("\n=== Hot vs Cold Observables ===");
// Cold Observable - data produced for each subscriber
Observable<String> coldObservable = Observable.defer(() ->
Observable.just("Data created at: " + System.currentTimeMillis())
);
System.out.println("Cold Observable:");
coldObservable.subscribe(data -> System.out.println("Subscriber 1: " + data));
coldObservable.subscribe(data -> System.out.println("Subscriber 2: " + data));
// Hot Observable - data produced independently of subscribers
PublishSubject<String> hotObservable = PublishSubject.create();
System.out.println("\nHot Observable:");
hotObservable.subscribe(data -> System.out.println("Subscriber A: " + data));
// Emit data
hotObservable.onNext("First emission");
hotObservable.onNext("Second emission");
// New subscriber joins late
hotObservable.subscribe(data -> System.out.println("Subscriber B: " + data));
hotObservable.onNext("Third emission");
}
private void backpressureExample() {
System.out.println("\n=== Backpressure Handling ===");
// Using Flowable for backpressure support
Flowable.range(1, 1000)
.onBackpressureBuffer() // Handle backpressure by buffering
.observeOn(Schedulers.computation())
.subscribe(
item -> {
try {
Thread.sleep(10); // Simulate slow consumer
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Processed: " + item);
},
error -> System.err.println("Error: " + error)
);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void cleanup() {
compositeDisposable.dispose();
}
}
Operators and Transformation
1. Common RxJava Operators
// RxJavaOperatorsExample.java
public class RxJavaOperatorsExample {
public void demonstrateOperators() {
transformationOperators();
filteringOperators();
combiningOperators();
utilityOperators();
errorHandlingOperators();
conditionalOperators();
}
private void transformationOperators() {
System.out.println("=== Transformation Operators ===");
// map - transform each item
Observable.just(1, 2, 3)
.map(number -> number * 2)
.subscribe(result -> System.out.println("Map: " + result));
// flatMap - transform each item into an Observable
Observable.just("Hello", "World")
.flatMap(word -> Observable.fromArray(word.split("")))
.subscribe(letter -> System.out.println("FlatMap: " + letter));
// concatMap - like flatMap but preserves order
Observable.just(1, 2, 3)
.concatMap(number -> Observable.just(number * 10, number * 100))
.subscribe(result -> System.out.println("ConcatMap: " + result));
// switchMap - cancels previous emissions
Observable.just(1, 2, 3)
.switchMap(number -> Observable.timer(100 * number, TimeUnit.MILLISECONDS)
.map(tick -> "SwitchMap: " + number))
.subscribe(result -> System.out.println(result));
// buffer - group items into lists
Observable.range(1, 10)
.buffer(3)
.subscribe(batch -> System.out.println("Buffer: " + batch));
}
private void filteringOperators() {
System.out.println("\n=== Filtering Operators ===");
// filter - emit only items that pass a predicate
Observable.range(1, 10)
.filter(number -> number % 2 == 0)
.subscribe(even -> System.out.println("Filter: " + even));
// take - take only first n items
Observable.interval(1, TimeUnit.SECONDS)
.take(3)
.subscribe(item -> System.out.println("Take: " + item));
// skip - skip first n items
Observable.range(1, 5)
.skip(2)
.subscribe(item -> System.out.println("Skip: " + item));
// distinct - remove duplicates
Observable.just(1, 2, 2, 3, 3, 3, 4)
.distinct()
.subscribe(unique -> System.out.println("Distinct: " + unique));
// debounce - emit only after a quiet period
Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(100);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(100);
emitter.onNext("D");
emitter.onComplete();
})
.debounce(200, TimeUnit.MILLISECONDS) // Wait for 200ms of silence
.subscribe(item -> System.out.println("Debounce: " + item));
}
private void combiningOperators() {
System.out.println("\n=== Combining Operators ===");
Observable<String> first = Observable.just("A", "B", "C");
Observable<String> second = Observable.just("1", "2", "3");
// merge - combine multiple Observables
Observable.merge(first, second)
.subscribe(item -> System.out.println("Merge: " + item));
// concat - combine sequentially
Observable.concat(first, second)
.subscribe(item -> System.out.println("Concat: " + item));
// zip - combine item-by-item
Observable.zip(first, second, (f, s) -> f + s)
.subscribe(combined -> System.out.println("Zip: " + combined));
// combineLatest - combine latest from each
Observable.combineLatest(
Observable.interval(1, TimeUnit.SECONDS).map(tick -> "Tick_" + tick),
Observable.interval(2, TimeUnit.SECONDS).map(tick -> "Tock_" + tick),
(tick, tock) -> tick + " | " + tock
)
.take(5)
.subscribe(combined -> System.out.println("CombineLatest: " + combined));
}
private void utilityOperators() {
System.out.println("\n=== Utility Operators ===");
// doOnNext - side effect for each emission
Observable.just(1, 2, 3)
.doOnNext(item -> System.out.println("About to emit: " + item))
.doOnSubscribe(d -> System.out.println("Subscribed"))
.doOnComplete(() -> System.out.println("Completed"))
.subscribe(item -> System.out.println("Emitted: " + item));
// delay - delay emissions
Observable.just("Delayed")
.delay(1, TimeUnit.SECONDS)
.subscribe(item -> System.out.println("Delay: " + item));
// timeout - error if no emission in time
Observable.timer(2, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("Timeout success"),
error -> System.out.println("Timeout error: " + error)
);
// retry - resubscribe on error
AtomicInteger attempts = new AtomicInteger();
Observable.create(emitter -> {
if (attempts.incrementAndGet() < 3) {
emitter.onError(new RuntimeException("Simulated error"));
} else {
emitter.onNext("Success after retries");
emitter.onComplete();
}
})
.retry(2) // Retry up to 2 times
.subscribe(
result -> System.out.println("Retry: " + result),
error -> System.out.println("Retry failed: " + error)
);
}
private void errorHandlingOperators() {
System.out.println("\n=== Error Handling Operators ===");
// onErrorReturn - return default value on error
Observable.error(new RuntimeException("Error!"))
.onErrorReturn(throwable -> "Default Value")
.subscribe(
item -> System.out.println("onErrorReturn: " + item),
error -> System.out.println("This should not be called")
);
// onErrorResumeNext - switch to another Observable on error
Observable.error(new RuntimeException("Error!"))
.onErrorResumeNext(throwable -> Observable.just("Recovered", "Values"))
.subscribe(item -> System.out.println("onErrorResumeNext: " + item));
// onErrorReturnItem - simpler version
Observable.error(new RuntimeException("Error!"))
.onErrorReturnItem("Fallback Item")
.subscribe(item -> System.out.println("onErrorReturnItem: " + item));
}
private void conditionalOperators() {
System.out.println("\n=== Conditional Operators ===");
// all - check if all items meet condition
Observable.just(2, 4, 6, 8)
.all(number -> number % 2 == 0)
.subscribe(result -> System.out.println("All even: " + result));
// any - check if any item meets condition
Observable.just(1, 3, 5, 8)
.any(number -> number % 2 == 0)
.subscribe(result -> System.out.println("Any even: " + result));
// contains - check if sequence contains item
Observable.just("Apple", "Banana", "Cherry")
.contains("Banana")
.subscribe(result -> System.out.println("Contains Banana: " + result));
// defaultIfEmpty - emit default if empty
Observable.empty()
.defaultIfEmpty("Default Value")
.subscribe(item -> System.out.println("defaultIfEmpty: " + item));
// sequenceEqual - check if two Observables emit same sequence
Observable<Integer> first = Observable.just(1, 2, 3);
Observable<Integer> second = Observable.just(1, 2, 3);
Observable.sequenceEqual(first, second)
.subscribe(equal -> System.out.println("Sequences equal: " + equal));
}
}
Schedulers and Threading
1. Scheduler Management
// SchedulerExample.java
public class SchedulerExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateSchedulers() {
basicSchedulers();
observeOnVsSubscribeOn();
androidSchedulers();
customSchedulers();
}
private void basicSchedulers() {
System.out.println("=== Basic Schedulers ===");
// Schedulers.io() - For I/O bound work
Disposable ioWork = Observable.fromCallable(() -> {
System.out.println("IO work on: " + Thread.currentThread().getName());
Thread.sleep(1000); // Simulate I/O operation
return "IO Result";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(result ->
System.out.println("IO result on: " + Thread.currentThread().getName() + " - " + result)
);
// Schedulers.computation() - For CPU-intensive work
Disposable computationWork = Observable.range(1, 5)
.subscribeOn(Schedulers.computation())
.map(number -> {
System.out.println("Computation on: " + Thread.currentThread().getName());
return number * number;
})
.observeOn(Schedulers.newThread())
.subscribe(result ->
System.out.println("Computation result on: " + Thread.currentThread().getName() + " - " + result)
);
disposables.addAll(ioWork, computationWork);
}
private void observeOnVsSubscribeOn() {
System.out.println("\n=== observeOn vs subscribeOn ===");
Disposable example = Observable.create(emitter -> {
System.out.println("Emit on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
})
.subscribeOn(Schedulers.io()) // Affects upstream operations
.observeOn(Schedulers.computation()) // Affects downstream operations
.map(item -> {
System.out.println("Map on: " + Thread.currentThread().getName());
return item * 10;
})
.observeOn(Schedulers.single())
.subscribe(item ->
System.out.println("Receive on: " + Thread.currentThread().getName() + " - " + item)
);
disposables.add(example);
}
private void androidSchedulers() {
System.out.println("\n=== Android Schedulers ===");
// Note: In Android, you'd use AndroidSchedulers.mainThread()
// from RxAndroid library
// Simulating Android main thread with Schedulers.single()
Disposable androidExample = Observable.just("Network Result")
.subscribeOn(Schedulers.io()) // Network call on IO thread
.observeOn(Schedulers.single()) // Simulate Android main thread
.doOnNext(result ->
System.out.println("Updating UI on: " + Thread.currentThread().getName())
)
.subscribe(result ->
System.out.println("UI updated with: " + result)
);
disposables.add(androidExample);
}
private void customSchedulers() {
System.out.println("\n=== Custom Schedulers ===");
// Create custom scheduler from executor
ExecutorService customExecutor = Executors.newFixedThreadPool(3);
Scheduler customScheduler = Schedulers.from(customExecutor);
Disposable customWork = Observable.range(1, 6)
.subscribeOn(customScheduler)
.map(item -> {
System.out.println("Custom scheduler: " + Thread.currentThread().getName());
return item;
})
.subscribe();
disposables.add(customWork);
// Shutdown executor when done
disposables.add(Disposable.fromRunnable(() -> {
customExecutor.shutdown();
System.out.println("Custom executor shutdown");
}));
}
public void cleanup() {
disposables.dispose();
}
}
2. Threading Best Practices
// ThreadingBestPractices.java
public class ThreadingBestPractices {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateBestPractices() {
avoidBlockingOperations();
properSchedulerUsage();
memoryLeakPrevention();
errorHandlingInThreads();
}
private void avoidBlockingOperations() {
System.out.println("=== Avoid Blocking Operations ===");
// ❌ Bad: Blocking operation on computation scheduler
Disposable badExample = Observable.just("data")
.subscribeOn(Schedulers.computation())
.map(data -> {
try {
Thread.sleep(2000); // Blocking operation!
} catch (InterruptedException e) {
e.printStackTrace();
}
return data.toUpperCase();
})
.subscribe(result -> System.out.println("Bad result: " + result));
// ✅ Good: Use proper scheduler for blocking operations
Disposable goodExample = Observable.just("data")
.subscribeOn(Schedulers.io()) // Use IO scheduler for blocking ops
.map(data -> {
try {
Thread.sleep(2000); // This is okay on IO scheduler
} catch (InterruptedException e) {
e.printStackTrace();
}
return data.toUpperCase();
})
.observeOn(Schedulers.computation()) // Switch for computation
.subscribe(result -> System.out.println("Good result: " + result));
disposables.addAll(badExample, goodExample);
}
private void properSchedulerUsage() {
System.out.println("\n=== Proper Scheduler Usage ===");
Disposable example = Observable.range(1, 100)
.subscribeOn(Schedulers.io()) // Heavy initial work on IO
.filter(number -> number % 2 == 0)
.observeOn(Schedulers.computation()) // Computation for CPU work
.map(number -> number * number)
.buffer(10)
.observeOn(Schedulers.single()) // Single thread for final processing
.subscribe(
batch -> System.out.println("Batch on " + Thread.currentThread().getName() + ": " + batch),
error -> System.err.println("Error: " + error),
() -> System.out.println("Processing completed")
);
disposables.add(example);
}
private void memoryLeakPrevention() {
System.out.println("\n=== Memory Leak Prevention ===");
// Using CompositeDisposable for easy cleanup
CompositeDisposable localDisposables = new CompositeDisposable();
// Multiple subscriptions
Disposable sub1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Sub1: " + tick));
Disposable sub2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(tick -> System.out.println("Sub2: " + tick));
localDisposables.addAll(sub1, sub2);
// Simulate activity/fragment destruction
System.out.println("Cleaning up local disposables...");
localDisposables.dispose();
}
private void errorHandlingInThreads() {
System.out.println("\n=== Error Handling in Threads ===");
Disposable example = Observable.create(emitter -> {
// This runs on IO scheduler
if (Math.random() > 0.5) {
emitter.onError(new RuntimeException("Random error!"));
} else {
emitter.onNext("Success");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
result -> System.out.println("Success: " + result),
error -> System.err.println("Error handled on: " +
Thread.currentThread().getName() + " - " + error.getMessage())
);
disposables.add(example);
}
public void cleanup() {
disposables.dispose();
}
}
Android-specific RxJava
1. RxJava in Android Activities
// MainActivity.java
public class MainActivity extends AppCompatActivity {
private CompositeDisposable disposables = new CompositeDisposable();
private TextView resultTextView;
private ProgressBar progressBar;
private EditText searchEditText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initializeViews();
setupClickListeners();
setupSearchListener();
loadInitialData();
}
private void initializeViews() {
resultTextView = findViewById(R.id.resultTextView);
progressBar = findViewById(R.id.progressBar);
searchEditText = findViewById(R.id.searchEditText);
}
private void setupClickListeners() {
Button loadButton = findViewById(R.id.loadButton);
Button parallelButton = findViewById(R.id.parallelButton);
// Single network call
disposables.add(
RxView.clicks(loadButton)
.throttleFirst(1, TimeUnit.SECONDS) // Prevent multiple clicks
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(click -> showLoading(true))
.observeOn(Schedulers.io())
.flatMapSingle(click -> performNetworkCall())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> {
showLoading(false);
resultTextView.setText("Result: " + result);
},
error -> {
showLoading(false);
resultTextView.setText("Error: " + error.getMessage());
}
)
);
// Parallel network calls
disposables.add(
RxView.clicks(parallelButton)
.throttleFirst(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(click -> showLoading(true))
.observeOn(Schedulers.io())
.flatMap(click -> Observable.merge(
performNetworkCall("Call 1").toObservable(),
performNetworkCall("Call 2").toObservable(),
performNetworkCall("Call 3").toObservable()
))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> {
resultTextView.append("\nParallel: " + result);
},
error -> {
showLoading(false);
resultTextView.setText("Parallel Error: " + error.getMessage());
},
() -> {
showLoading(false);
resultTextView.append("\nAll parallel calls completed");
}
)
);
}
private void setupSearchListener() {
// Real-time search with debounce
disposables.add(
RxTextView.textChanges(searchEditText)
.debounce(300, TimeUnit.MILLISECONDS) // Wait for user to stop typing
.filter(text -> text.length() > 2) // Only search if more than 2 chars
.distinctUntilChanged() // Only if text changed
.switchMapSingle(query -> performSearch(query.toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> resultTextView.setText("Search results: " + results),
error -> resultTextView.setText("Search error: " + error.getMessage())
)
);
}
private void loadInitialData() {
// Load data when activity starts
disposables.add(
performNetworkCall("Initial Data")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> resultTextView.setText("Initial: " + result),
error -> resultTextView.setText("Initial load error: " + error.getMessage())
)
);
}
private Single<String> performNetworkCall() {
return performNetworkCall("Default");
}
private Single<String> performNetworkCall(String callName) {
return Single.fromCallable(() -> {
// Simulate network call
Thread.sleep(2000);
if (Math.random() > 0.8) {
throw new RuntimeException(callName + " failed!");
}
return callName + " completed at " + System.currentTimeMillis();
});
}
private Single<String> performSearch(String query) {
return Single.fromCallable(() -> {
// Simulate search API call
Thread.sleep(500);
return "Results for '" + query + "': " + (int)(Math.random() * 100);
}).subscribeOn(Schedulers.io());
}
private void showLoading(boolean show) {
progressBar.setVisibility(show ? View.VISIBLE : View.GONE);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.dispose(); // Prevent memory leaks
}
}
2. RxBinding for Android Views
// RxBindingExample.java
public class RxBindingExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void setupViewBindings(Activity activity) {
setupButtonClicks(activity);
setupTextChanges(activity);
setupCheckboxChanges(activity);
setupRecyclerViewScroll(activity);
setupMultipleViews(activity);
}
private void setupButtonClicks(Activity activity) {
Button button = activity.findViewById(R.id.someButton);
disposables.add(
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS) // Prevent double clicks
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
// Handle button click
showToast(activity, "Button clicked!");
})
);
// Long clicks
disposables.add(
RxView.longClicks(button)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
showToast(activity, "Button long clicked!");
})
);
}
private void setupTextChanges(Activity activity) {
EditText editText = activity.findViewById(R.id.someEditText);
disposables.add(
RxTextView.textChanges(editText)
.debounce(300, TimeUnit.MILLISECONDS) // Wait for typing to stop
.filter(text -> text.length() > 0) // Ignore empty text
.distinctUntilChanged() // Only emit when text actually changes
.observeOn(AndroidSchedulers.mainThread())
.subscribe(text -> {
// Handle text changes
updateSearchResults(activity, text.toString());
})
);
// Focus changes
disposables.add(
RxView.focusChanges(editText)
.skip(1) // Skip initial emission
.observeOn(AndroidSchedulers.mainThread())
.subscribe(hasFocus -> {
if (hasFocus) {
// EditText gained focus
showToast(activity, "EditText focused");
} else {
// EditText lost focus
showToast(activity, "EditText lost focus");
}
})
);
}
private void setupCheckboxChanges(Activity activity) {
CheckBox checkBox = activity.findViewById(R.id.someCheckBox);
disposables.add(
RxCompoundButton.checkedChanges(checkBox)
.skip(1) // Skip initial value
.observeOn(AndroidSchedulers.mainThread())
.subscribe(isChecked -> {
// Handle checkbox state change
String state = isChecked ? "checked" : "unchecked";
showToast(activity, "Checkbox " + state);
})
);
}
private void setupRecyclerViewScroll(Activity activity) {
RecyclerView recyclerView = activity.findViewById(R.id.someRecyclerView);
disposables.add(
RxRecyclerView.scrollStateChanges(recyclerView)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(state -> {
switch (state) {
case RecyclerView.SCROLL_STATE_DRAGGING:
// User is dragging
break;
case RecyclerView.SCROLL_STATE_SETTLING:
// Scroll is settling
break;
case RecyclerView.SCROLL_STATE_IDLE:
// Scroll is idle
checkLoadMore(activity);
break;
}
})
);
// Scroll events with more details
disposables.add(
RxRecyclerView.scrollEvents(recyclerView)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(event -> {
// Handle scroll events with more details
int dx = event.dx();
int dy = event.dy();
// Process scroll data...
})
);
}
private void setupMultipleViews(Activity activity) {
Button button1 = activity.findViewById(R.id.button1);
Button button2 = activity.findViewById(R.id.button2);
Button button3 = activity.findViewById(R.id.button3);
// Combine multiple button clicks
Observable<Object> button1Clicks = RxView.clicks(button1);
Observable<Object> button2Clicks = RxView.clicks(button2);
Observable<Object> button3Clicks = RxView.clicks(button3);
disposables.add(
Observable.merge(button1Clicks, button2Clicks, button3Clicks)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(click -> {
// Handle any button click
showToast(activity, "A button was clicked!");
})
);
// Combine specific conditions
EditText emailEditText = activity.findViewById(R.id.emailEditText);
EditText passwordEditText = activity.findViewById(R.id.passwordEditText);
Button submitButton = activity.findViewById(R.id.submitButton);
disposables.add(
Observable.combineLatest(
RxTextView.textChanges(emailEditText),
RxTextView.textChanges(passwordEditText),
(email, password) -> {
// Validate form
boolean emailValid = isValidEmail(email.toString());
boolean passwordValid = password.length() >= 6;
return emailValid && passwordValid;
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(isValid -> {
// Enable/disable submit button based on validation
submitButton.setEnabled(isValid);
})
);
}
private void updateSearchResults(Activity activity, String query) {
// Implement search logic
TextView resultsView = activity.findViewById(R.id.resultsTextView);
resultsView.setText("Searching for: " + query);
// You would typically make an API call here
}
private void checkLoadMore(Activity activity) {
// Implement load more logic for RecyclerView
RecyclerView recyclerView = activity.findViewById(R.id.someRecyclerView);
LinearLayoutManager layoutManager = (LinearLayoutManager) recyclerView.getLayoutManager();
if (layoutManager != null) {
int visibleItemCount = layoutManager.getChildCount();
int totalItemCount = layoutManager.getItemCount();
int firstVisibleItemPosition = layoutManager.findFirstVisibleItemPosition();
if ((visibleItemCount + firstVisibleItemPosition) >= totalItemCount
&& firstVisibleItemPosition >= 0) {
// Load more data
loadMoreData(activity);
}
}
}
private void loadMoreData(Activity activity) {
// Implement load more data logic
showToast(activity, "Loading more data...");
}
private boolean isValidEmail(String email) {
return email != null && android.util.Patterns.EMAIL_ADDRESS.matcher(email).matches();
}
private void showToast(Activity activity, String message) {
Toast.makeText(activity, message, Toast.LENGTH_SHORT).show();
}
public void cleanup() {
disposables.dispose();
}
}
Networking with RxJava
1. Retrofit with RxJava
// ApiService.java
public interface ApiService {
@GET("users/{userId}")
Single<User> getUser(@Path("userId") String userId);
@GET("users")
Single<List<User>> getUsers();
@POST("users")
Single<User> createUser(@Body User user);
@GET("posts")
Single<List<Post>> getUserPosts(@Query("userId") String userId);
@GET("users/{userId}/posts")
Single<List<Post>> getUserPostsWithPath(@Path("userId") String userId);
@Multipart
@POST("upload")
Single<UploadResponse> uploadFile(
@Part MultipartBody.Part file,
@Part("description") RequestBody description
);
@Streaming
@GET("files/{fileName}")
Single<ResponseBody> downloadFile(@Path("fileName") String fileName);
}
// User.java
public class User {
private String id;
private String name;
private String email;
private String avatar;
// Constructors, getters, setters
public User() {}
public User(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getters and setters...
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getAvatar() { return avatar; }
public void setAvatar(String avatar) { this.avatar = avatar; }
}
// Post.java
public class Post {
private String id;
private String userId;
private String title;
private String body;
// Constructors, getters, setters...
public Post() {}
public Post(String id, String userId, String title, String body) {
this.id = id;
this.userId = userId;
this.title = title;
this.body = body;
}
// Getters and setters...
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getBody() { return body; }
public void setBody(String body) { this.body = body; }
}
2. Network Repository Pattern
// UserRepository.java
public class UserRepository {
private final ApiService apiService;
private final CompositeDisposable disposables = new CompositeDisposable();
public UserRepository(ApiService apiService) {
this.apiService = apiService;
}
// Basic CRUD operations
public Single<User> getUser(String userId) {
return apiService.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Single<List<User>> getUsers() {
return apiService.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Single<User> createUser(User user) {
return apiService.createUser(user)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Complex operations
public Single<User> getUserWithPosts(String userId) {
return apiService.getUser(userId)
.flatMap(user ->
apiService.getUserPosts(userId)
.map(posts -> {
user.setPosts(posts); // Assuming User has posts field
return user;
})
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Parallel operations
public Single<List<User>> getUsersWithPosts() {
return apiService.getUsers()
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(user ->
apiService.getUserPosts(user.getId())
.map(posts -> {
user.setPosts(posts);
return user;
})
.onErrorReturnItem(user) // Return user even if posts fail
)
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Batch operations
public Completable updateMultipleUsers(List<User> users) {
return Observable.fromIterable(users)
.flatMapCompletable(user ->
apiService.createUser(user)
.ignoreElement() // Convert Single to Completable
.onErrorComplete() // Continue on error
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Cached network calls
public Observable<User> getUserWithCache(String userId) {
return Observable.concat(
getCachedUser(userId).toObservable(),
apiService.getUser(userId)
.doOnSuccess(this::cacheUser)
.toObservable()
)
.firstElement() // Take first non-empty emission
.toObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
private Single<User> getCachedUser(String userId) {
// Implement cache retrieval
return Single.error(new RuntimeException("Not in cache")); // Simplified
}
private void cacheUser(User user) {
// Implement cache storage
}
// Error handling with retry
public Single<User> getUserWithRetry(String userId) {
return apiService.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
if (retryCount < 3 && isNetworkError(error)) {
return retryCount;
}
throw Exceptions.propagate(error);
})
.flatMap(retryCount ->
Observable.timer((long) Math.pow(2, retryCount), TimeUnit.SECONDS)
)
);
}
private boolean isNetworkError(Throwable error) {
return error instanceof IOException ||
error instanceof SocketTimeoutException ||
error instanceof ConnectException;
}
public void cleanup() {
disposables.dispose();
}
}
3. Network ViewModel
// UserViewModel.java
public class UserViewModel extends ViewModel {
private final UserRepository userRepository;
private final CompositeDisposable disposables = new CompositeDisposable();
// LiveData for UI observation
private final MutableLiveData<List<User>> usersLiveData = new MutableLiveData<>();
private final MutableLiveData<User> userLiveData = new MutableLiveData<>();
private final MutableLiveData<Boolean> loadingLiveData = new MutableLiveData<>();
private final MutableLiveData<String> errorLiveData = new MutableLiveData<>();
public UserViewModel(UserRepository userRepository) {
this.userRepository = userRepository;
}
public void loadUsers() {
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUsers()
.subscribe(
users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
errorLiveData.setValue(null);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to load users: " + error.getMessage());
}
)
);
}
public void loadUser(String userId) {
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUserWithPosts(userId)
.subscribe(
user -> {
loadingLiveData.setValue(false);
userLiveData.setValue(user);
errorLiveData.setValue(null);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to load user: " + error.getMessage());
}
)
);
}
public void createUser(User user) {
loadingLiveData.setValue(true);
disposables.add(
userRepository.createUser(user)
.subscribe(
createdUser -> {
loadingLiveData.setValue(false);
// Update users list or navigate
loadUsers(); // Refresh the list
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Failed to create user: " + error.getMessage());
}
)
);
}
public void searchUsers(String query) {
if (query == null || query.trim().isEmpty()) {
loadUsers();
return;
}
loadingLiveData.setValue(true);
disposables.add(
userRepository.getUsers()
.flatMapObservable(Observable::fromIterable)
.filter(user ->
user.getName().toLowerCase().contains(query.toLowerCase()) ||
user.getEmail().toLowerCase().contains(query.toLowerCase())
)
.toList()
.subscribe(
filteredUsers -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(filteredUsers);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue("Search failed: " + error.getMessage());
}
)
);
}
// LiveData getters
public LiveData<List<User>> getUsersLiveData() {
return usersLiveData;
}
public LiveData<User> getUserLiveData() {
return userLiveData;
}
public LiveData<Boolean> getLoadingLiveData() {
return loadingLiveData;
}
public LiveData<String> getErrorLiveData() {
return errorLiveData;
}
@Override
protected void onCleared() {
super.onCleared();
disposables.dispose();
}
}
Database Operations
1. Room Database with RxJava
// UserDao.java
@Dao
public interface UserDao {
@Insert
Completable insert(User user);
@Insert
Completable insertAll(List<User> users);
@Update
Completable update(User user);
@Delete
Completable delete(User user);
@Query("SELECT * FROM users")
Flowable<List<User>> getAllUsers();
@Query("SELECT * FROM users WHERE id = :userId")
Maybe<User> getUserById(String userId);
@Query("SELECT * FROM users WHERE name LIKE :searchQuery")
Flowable<List<User>> searchUsers(String searchQuery);
@Query("SELECT * FROM users ORDER BY name ASC")
Flowable<List<User>> getUsersSortedByName();
@Query("DELETE FROM users")
Completable deleteAllUsers();
}
// AppDatabase.java
@Database(entities = {User.class, Post.class}, version = 1)
public abstract class AppDatabase extends RoomDatabase {
public abstract UserDao userDao();
public abstract PostDao postDao();
}
// LocalUserRepository.java
public class LocalUserRepository {
private final UserDao userDao;
private final CompositeDisposable disposables = new CompositeDisposable();
public LocalUserRepository(UserDao userDao) {
this.userDao = userDao;
}
// Basic operations
public Completable insertUser(User user) {
return userDao.insert(user)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Completable insertUsers(List<User> users) {
return userDao.insertAll(users)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Flowable<List<User>> getAllUsers() {
return userDao.getAllUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Maybe<User> getUserById(String userId) {
return userDao.getUserById(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// Complex operations
public Completable saveUserWithPosts(User user, List<Post> posts) {
return Completable.mergeArray(
userDao.insert(user),
// Assuming you have a PostDao with insertAll method
Completable.fromAction(() -> {
// Save posts logic
})
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public Flowable<List<User>> searchUsers(String query) {
return userDao.searchUsers("%" + query + "%")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorReturnItem(Collections.emptyList()); // Return empty list on error
}
// Sync operations
public Completable syncUsers(List<User> users) {
return userDao.deleteAllUsers()
.andThen(userDao.insertAll(users))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public void cleanup() {
disposables.dispose();
}
}
Error Handling
1. Comprehensive Error Handling
// ErrorHandlingExample.java
public class ErrorHandlingExample {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateErrorHandling() {
basicErrorHandling();
retryStrategies();
errorRecovery();
globalErrorHandler();
}
private void basicErrorHandling() {
System.out.println("=== Basic Error Handling ===");
Observable<String> errorObservable = Observable.create(emitter -> {
emitter.onNext("First item");
emitter.onError(new RuntimeException("Something went wrong!"));
});
disposables.add(
errorObservable
.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error handled: " + error.getMessage()),
() -> System.out.println("Completed") // This won't be called
)
);
}
private void retryStrategies() {
System.out.println("\n=== Retry Strategies ===");
AtomicInteger attempts = new AtomicInteger();
Observable<String> flakyObservable = Observable.create(emitter -> {
int attempt = attempts.incrementAndGet();
if (attempt < 3) {
emitter.onError(new RuntimeException("Failed on attempt " + attempt));
} else {
emitter.onNext("Success on attempt " + attempt);
emitter.onComplete();
}
});
// Simple retry
disposables.add(
flakyObservable
.retry(2) // Retry up to 2 times
.subscribe(
result -> System.out.println("Simple retry: " + result),
error -> System.err.println("Simple retry failed: " + error.getMessage())
)
);
// Retry with delay
disposables.add(
flakyObservable
.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (error, retryCount) -> {
if (retryCount > 3) {
throw Exceptions.propagate(error);
}
return retryCount;
})
.flatMap(retryCount ->
Observable.timer(retryCount, TimeUnit.SECONDS)
)
)
.subscribe(
result -> System.out.println("Delayed retry: " + result),
error -> System.err.println("Delayed retry failed: " + error.getMessage())
)
);
}
private void errorRecovery() {
System.out.println("\n=== Error Recovery ===");
Observable<String> unreliableObservable = Observable.create(emitter -> {
if (Math.random() > 0.5) {
emitter.onNext("Success");
emitter.onComplete();
} else {
emitter.onError(new RuntimeException("Random failure"));
}
});
// Return default value on error
disposables.add(
unreliableObservable
.onErrorReturnItem("Default Value")
.subscribe(result -> System.out.println("Error return: " + result))
);
// Switch to fallback Observable
disposables.add(
unreliableObservable
.onErrorResumeNext(throwable ->
Observable.just("Fallback", "Values")
)
.subscribe(result -> System.out.println("Error resume: " + result))
);
// Continue with next item
Observable<String> source = Observable.just("First", "Second", "Third")
.flatMap(item -> {
if ("Second".equals(item)) {
return Observable.error(new RuntimeException("Failed on second"));
}
return Observable.just(item);
});
disposables.add(
source
.onErrorResumeNext(Observable.empty()) // Continue with empty
.subscribe(
item -> System.out.println("Continue: " + item),
error -> System.err.println("This shouldn't be called"),
() -> System.out.println("Completed despite errors")
)
);
}
private void globalErrorHandler() {
System.out.println("\n=== Global Error Handler ===");
RxJavaPlugins.setErrorHandler(throwable -> {
// Global error handler for uncaught exceptions
System.err.println("Global error handler: " + throwable.getMessage());
// You could log to Crashlytics, show notification, etc.
});
// This error won't crash the app due to global handler
Observable.error(new RuntimeException("Unhandled error"))
.subscribe();
}
public void cleanup() {
disposables.dispose();
}
}
Testing RxJava
1. RxJava Testing
// RxJavaTestExample.java
public class RxJavaTestExample {
private TestScheduler testScheduler;
@Before
public void setup() {
testScheduler = new TestScheduler();
}
@Test
public void testBasicObservable() {
// Given
Observable<String> observable = Observable.just("Hello", "World");
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValues("Hello", "World");
testObserver.assertComplete();
testObserver.assertNoErrors();
}
@Test
public void testErrorHandling() {
// Given
Observable<String> observable = Observable.error(new RuntimeException("Test error"));
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertError(RuntimeException.class);
testObserver.assertErrorMessage("Test error");
}
@Test
public void testTimeBasedObservable() {
// Given
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.take(3);
TestObserver<Long> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then - no values yet
testObserver.assertNoValues();
// Advance time
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testObserver.assertValues(0L, 1L);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValues(0L, 1L, 2L);
testObserver.assertComplete();
}
@Test
public void testAsyncOperation() {
// Given
Observable<String> observable = Observable.fromCallable(() -> {
Thread.sleep(1000); // Simulate async work
return "Result";
}).subscribeOn(Schedulers.trampoline()); // Use trampoline for testing
TestObserver<String> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValue("Result");
testObserver.assertComplete();
}
@Test
public void testOperatorChain() {
// Given
Observable<Integer> observable = Observable.range(1, 5)
.filter(number -> number % 2 == 0)
.map(number -> number * 10);
TestObserver<Integer> testObserver = new TestObserver<>();
// When
observable.subscribe(testObserver);
// Then
testObserver.assertValues(20, 40);
testObserver.assertComplete();
}
}
Best Practices
1. RxJava Best Practices
// RxJavaBestPractices.java
public class RxJavaBestPractices {
private CompositeDisposable disposables = new CompositeDisposable();
public void demonstrateBestPractices() {
useCompositeDisposable();
avoidMemoryLeaks();
properErrorHandling();
useAppropriateSchedulers();
avoidOverusingRxJava();
useDisposableMaybe();
}
private void useCompositeDisposable() {
System.out.println("=== Use CompositeDisposable ===");
CompositeDisposable localDisposables = new CompositeDisposable();
// Add multiple disposables
localDisposables.add(
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Tick: " + tick))
);
localDisposables.add(
Observable.just("Data")
.subscribe(data -> System.out.println("Data: " + data))
);
// Clean up all at once
localDisposables.dispose();
}
private void avoidMemoryLeaks() {
System.out.println("\n=== Avoid Memory Leaks ===");
// ❌ Bad: Never disposing subscriptions
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Leaky: " + tick));
// ✅ Good: Proper disposal
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Safe: " + tick));
disposables.add(disposable);
}
private void properErrorHandling() {
System.out.println("\n=== Proper Error Handling ===");
// ❌ Bad: Ignoring errors
Observable.error(new RuntimeException("Error"))
.subscribe(
item -> System.out.println("Received: " + item)
// Missing error handler!
);
// ✅ Good: Always handle errors
Observable.error(new RuntimeException("Error"))
.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error handled: " + error.getMessage())
);
// Even better: Use onErrorReturn or onErrorResumeNext
Observable.error(new RuntimeException("Error"))
.onErrorReturnItem("Fallback")
.subscribe(item -> System.out.println("Result: " + item));
}
private void useAppropriateSchedulers() {
System.out.println("\n=== Use Appropriate Schedulers ===");
// I/O operations
disposables.add(
Observable.fromCallable(() -> {
// Database or file operation
return "I/O Result";
})
.subscribeOn(Schedulers.io())
.subscribe()
);
// Computation operations
disposables.add(
Observable.range(1, 1000)
.subscribeOn(Schedulers.computation())
.map(number -> number * number) // CPU-intensive
.subscribe()
);
// UI updates (in Android)
disposables.add(
Observable.just("Data")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// Update UI
System.out.println("UI update: " + data);
})
);
}
private void avoidOverusingRxJava() {
System.out.println("\n=== Avoid Overusing RxJava ===");
// ❌ Bad: Using RxJava for simple synchronous operations
Observable.just("Hello")
.map(String::toUpperCase)
.subscribe(result -> System.out.println("Overkill: " + result));
// ✅ Good: Use regular Java for simple operations
String result = "Hello".toUpperCase();
System.out.println("Simple: " + result);
// Use RxJava when you need its benefits:
// - Asynchronous operations
// - Complex data transformations
// - Combining multiple data sources
// - Error handling across async boundaries
}
private void useDisposableMaybe() {
System.out.println("\n=== Use Disposable Maybe ===");
// For operations that might not complete immediately
Maybe<String> maybeOperation = Maybe.create(emitter -> {
if (Math.random() > 0.5) {
emitter.onSuccess("Success");
} else {
emitter.onComplete(); // No result, but no error either
}
});
disposables.add(
maybeOperation
.subscribe(
result -> System.out.println("Maybe success: " + result),
error -> System.err.println("Maybe error: " + error),
() -> System.out.println("Maybe completed without result")
)
);
}
// Additional best practices
public void additionalBestPractices() {
useMeaningfulVariableNames();
avoidNestedSubscriptions();
useOperatorsEffectively();
testYourRxCode();
}
private void useMeaningfulVariableNames() {
// ❌ Bad: Unclear variable names
Observable<String> o = Observable.just("data");
Disposable d = o.subscribe();
// ✅ Good: Descriptive names
Observable<String> userDataStream = Observable.just("user data");
Disposable userSubscription = userDataStream.subscribe();
disposables.add(userSubscription);
}
private void avoidNestedSubscriptions() {
// ❌ Bad: Nested subscriptions (callback hell in RxJava)
Observable.just("outer")
.subscribe(outer -> {
Observable.just("inner")
.subscribe(inner -> System.out.println(outer + " - " + inner));
});
// ✅ Good: Use flatMap or other operators
Observable.just("outer")
.flatMap(outer ->
Observable.just("inner")
.map(inner -> outer + " - " + inner)
)
.subscribe(result -> System.out.println(result));
}
private void useOperatorsEffectively() {
// Chain operators for better readability
disposables.add(
Observable.range(1, 100)
.filter(number -> number % 2 == 0) // Filter even numbers
.map(number -> number * 2) // Double them
.take(10) // Take first 10
.reduce(0, Integer::sum) // Sum them up
.subscribe(total -> System.out.println("Total: " + total))
);
}
private void testYourRxCode() {
// Always test your RxJava code, especially:
// - Async operations
// - Error handling
// - Operator chains
// - Threading behavior
}
public void cleanup() {
disposables.dispose();
}
}
Conclusion
RxJava is a powerful tool for Android development that can greatly simplify asynchronous programming and data stream handling. Key takeaways:
- Use appropriate schedulers for different types of work
- Always handle errors and use proper error recovery strategies
- Prevent memory leaks with CompositeDisposable and proper lifecycle management
- Choose the right Observable type (Single, Maybe, Completable) for your use case
- Use operators effectively to transform, filter, and combine data streams
- Test your RxJava code thoroughly, especially async operations
- Avoid overusing RxJava for simple synchronous operations
- Follow reactive programming principles for clean, maintainable code
By following these best practices and patterns, you can build robust, responsive Android applications that handle complex asynchronous operations with ease.