RxJava: chaining observables –

Development issue/problem:

Is it possible to use RxJava to set up something like the following chain?

loginObservable()
.then( (someData) -> {
// returns another observable with a long operation
fetchUserDataObservable(someData) ;

})).then( (userData) -> {
// should be called when receiving ready user data (with userData type T)
cacheUserData(userData) ;

})).then( (userData) -> {
// should be called after all previous operations of
displayUserData() are performed.

}).doOnError( (error) -> {
//does something
}).

I found this library very interesting, but I can’t figure out how we link the questions together when they all depend on each other.

How can I solve this problem?

Solution 1:

Of course, RxJava supports a .map that does this. According to the RxJava Wiki:

Map

Anyway, that would be it:

loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
// regard̩ thread completed Рno more logins
}
@Error
public invalid onError(Throwable e) {
// do something
}
@Attack
public void onNext(YourType yourType) {
displayUserData() ;
}
}) ;

Solution 2:

This is the first post on Google for RxJava-Chain-Observables, so I’m adding another common case where you don’t want to transform the data you get, but link it to some other action (like installing data into a database). Use .flatmap(). Here’s an example:

mDataManager
.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observerOn(mSchedulerProvider.ui())
// OnErrorResumeNext and Observable.error() move the error to the next
// level. So any errors that occur here are passed to
// onError() on the UI side.
.onErrorResumeNext(Function {Observer error>(it) })
.flatMap { t : List ->
// This string
mDataManager.setQuotesToDb(t).subscribe(
{},
{ e { setQuotesToDb()) Error occurred: ${it.localizedMessage} }
{ d { Ready server set} }
)
Observed.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView ?.showError(No internet connection) },
onComplete = { d { onComplete(): done with quotes from api }
)

This is RxKotlin2, but the idea is the same as RxJava & RxJava2 :

Quick explanation

  • we try to retrieve some data (courses in this example) from the api using mDataManager.fetchQuotesFromApi().
  • We sign the observable to do things in the .io() feed and display the results in the .ui() feed.
  • onErrorResumeNext() guarantees that any error we encounter when receiving data in this way is taken into account. I want to break the whole chain if an error occurs there, so I return Observable.error().
  • .flatmap() is the part of the string. I want to be able to install all the data I receive from the API into my database. I don’t transform the received data with .map(), I just do something else with the data without transforming it.
  • I concur with the last observation channel. If an error had occurred during data extraction (the first one observed), it would have been handled in this case with onErrorResumeNext() (passed to the onError() signature).
  • I am well aware that I am subscribed to the monitored database (in flatmap()). Any errors encountered by this observed method will NOT be passed to the final subscribeBy() method, as they will be handled by the subscribe() method within the .flatmap() string.

The code comes from this project, which can be found here: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt.

Solution 3:

Try the Scan() function

Flowable.fromArray(array).scan(…).subscribe(…).

Good luck!

Related Tags:

 rxjava chain api callsrxjava network call examplerxjava sequential api callretrofit chain callsrxjava multiple api callsrxjava chain singlerxjava parallel requestsobservable chain androidrxjava combine singlesrun observables in sequencerxjava piperxjava concatmaprxjava flatmaprxjava switchmaprxjava operatorssingle flatmapflatmaplatest rxjavaflatmap observablerx flatmap operatorrxjava one observable after anotherrxjava queue observables

Leave a Reply

Your email address will not be published. Required fields are marked *