RxJava: enchaînement des observables
Est-il possible de mettre en place quelque chose comme la prochaine chaîne via RxJava:
loginObservable()
.then( (someData) -> {
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
}).then( (userData) -> {
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
}).then( (userData) -> {
// it should be called after all previous operations completed
displayUserData()
}).doOnError( (error) -> {
//do something
})
j'ai trouvé cette bibliothèque très intéressante, mais je n'arrive pas à trouver comment enchaîner les requêtes où l'une dépend de l'autre précédente.
3 réponses
bien sûr, RxJava supporte .map
ce qui fait ceci. À partir de la RxJava Wiki:
en gros, ce serait:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>() {
@Override
public void onCompleted() {
// observable stream has ended - no more logins possible
}
@Override
public void onError(Throwable e) {
// do something
}
@Override
public void onNext(YourType yourType) {
displayUserData();
}
});
C'est le poste le plus élevé lorsque vous Tapez RxJava de la chaîne d'observables alors je vais juste ajouter un autre cas commun où vous ne voudriez pas transformer les données que vous recevez, mais enchaînez-les avec une autre action (paramétrer les données à une base de données, par exemple). Utilisez .flatmap()
. Voici un exemple
mDataManager.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.onErrorResumeNext(Function { Observable.error<List<Quote>>(it) }) //OnErrorResumeNext and Observable.error() would propagate the error to the next level. So, whatever error occurs here, would get passed to onError() on the UI side
.flatMap { t: List<Quote> ->
//Chain observable as such
mDataManager.setQuotesToDb(t).subscribe({}, { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } }, { d { "Done server set" } })
Observable.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView?.showError("No internet connection") },
onComplete = { d { "onComplete(): done with fetching quotes from api" } }
)
C'est RxKotlin2, mais l'idée est la même avec RxJava & RxJava2:
explication Rapide:
- nous essayons d'en chercher données (guillemets dans cet exemple) d'une api avec
mDataManager.fetchQuotesFromApi()
- Nous vous abonner observables à faire des trucs sur
.io()
fil et de montrer les résultats sur.ui()
thread. onErrorResumeNext()
s'assure que toute erreur que nous rencontrons en récupérant des données est captée dans cette méthode. Je veux terminer la chaîne entière quand il y a une erreur là-bas, donc je renvoie unObservable.error()
.flatmap()
est la partie chaînage. Je veux être capable de mettre toutes les données que j'obtiens de L'API dans ma base de données. Je ne transforme pas les données que j'ai reçues en utilisant.map()
, je suis tout simplement en faisant autre chose avec les données sans transformation.- je souscris à la dernière chaîne d'observables. Si une erreur se produisait lors de la collecte des données (première observable), elle serait traitée (dans ce cas, propagée à l'abonné
onError()
) aveconErrorResumeNext()
- je suis très conscient que je m'abonne à la DB observable (inside
flatmap()
). Toute erreur qui se produit par cette volonté observable être propagée à la dernièresubscribeBy()
méthodes, puisqu'il est manipulé à l'intérieur de lasubscribe()
méthode à l'intérieur de l'.flatmap()
chaîne.
le code vient de ce projet qui se trouve ici: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt
essayez d'utiliser scan()
Flowable.fromArray(array).scan(...).subscribe(...)