RxJava: enchaînement des observables

Est-il possible de mettre en place quelque chose comme la prochaine chaîne via RxJava:

loginObservable()
   .then( (someData) -> {
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   }).then( (userData) -> {
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   }).then( (userData) -> {
      // it should be called after all previous operations completed
      displayUserData()

   }).doOnError( (error) -> {
      //do something
   })

j'ai trouvé cette bibliothèque très intéressante, mais je n'arrive pas à trouver comment enchaîner les requêtes où l'une dépend de l'autre précédente.

35
demandé sur Mikhail 2014-11-14 20:41:54

3 réponses

bien sûr, RxJava supporte .map ce qui fait ceci. À partir de la RxJava Wiki:

map

en gros, ce serait:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() {
        @Override
        public void onCompleted() {
           // observable stream has ended - no more logins possible
        }
        @Override
        public void onError(Throwable e) {
            // do something
        }
        @Override
        public void onNext(YourType yourType) {
            displayUserData();
        }
    });
39
répondu Benjamin Gruenbaum 2015-02-28 20:17:47

C'est le poste le plus élevé lorsque vous Tapez RxJava de la chaîne d'observables alors je vais juste ajouter un autre cas commun où vous ne voudriez pas transformer les données que vous recevez, mais enchaînez-les avec une autre action (paramétrer les données à une base de données, par exemple). Utilisez .flatmap(). Voici un exemple

    mDataManager.fetchQuotesFromApi(limit)
            .subscribeOn(mSchedulerProvider.io())
            .observeOn(mSchedulerProvider.ui())
            .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) }) //OnErrorResumeNext and Observable.error() would propagate the error to the next level. So, whatever error occurs here, would get passed to onError() on the UI side
            .flatMap { t: List<Quote> ->
                //Chain observable as such
                mDataManager.setQuotesToDb(t).subscribe({}, { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } }, { d { "Done server set" } })
                Observable.just(t)
            }
            .subscribeBy(
                    onNext = {},
                    onError = { mvpView?.showError("No internet connection") },
                    onComplete = { d { "onComplete(): done with fetching quotes from api" } }
            )

C'est RxKotlin2, mais l'idée est la même avec RxJava & RxJava2:

explication Rapide:

  • nous essayons d'en chercher données (guillemets dans cet exemple) d'une api avec mDataManager.fetchQuotesFromApi()
  • Nous vous abonner observables à faire des trucs sur .io() fil et de montrer les résultats sur .ui() thread.
  • onErrorResumeNext() s'assure que toute erreur que nous rencontrons en récupérant des données est captée dans cette méthode. Je veux terminer la chaîne entière quand il y a une erreur là-bas, donc je renvoie un Observable.error()
  • .flatmap() est la partie chaînage. Je veux être capable de mettre toutes les données que j'obtiens de L'API dans ma base de données. Je ne transforme pas les données que j'ai reçues en utilisant .map(), je suis tout simplement en faisant autre chose avec les données sans transformation.
  • je souscris à la dernière chaîne d'observables. Si une erreur se produisait lors de la collecte des données (première observable), elle serait traitée (dans ce cas, propagée à l'abonné onError()) avec onErrorResumeNext()
  • je suis très conscient que je m'abonne à la DB observable (inside flatmap()). Toute erreur qui se produit par cette volonté observable être propagée à la dernière subscribeBy() méthodes, puisqu'il est manipulé à l'intérieur de la subscribe() méthode à l'intérieur de l' .flatmap() chaîne.

le code vient de ce projet qui se trouve ici: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt

5
répondu Solidak 2017-09-22 09:41:16

essayez d'utiliser scan()

Flowable.fromArray(array).scan(...).subscribe(...)
0
répondu lingfliu 2017-03-23 01:57:59