Comment puis-je faire en sorte qu'une séquence Observable RxJS attende qu'une autre se termine avant d'émettre?

Dire que j'ai un Observables, comme suit:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Ensuite, j'ai un deuxième observables:

var two = someOtherObservable.take(1);

Maintenant, je veux m'abonner two, mais je veux faire en sorte que one a terminé avant la balise two abonné est déclenché. Quelle méthode de mise en tampon puis-je utiliser sur two faire attendre le second pour que le premier soit terminé?

je suppose que je suis à la recherche pour mettre en pause two jusqu'à one est complète.

26
demandé sur Stephen 2015-05-29 04:21:18

8 réponses

Un couple de façons que je peux penser de

//Method one
var one = someObservable.take(1);
var two = someOtherObservable.take(1);
one.concat(two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.take(1);
var two = someOtherObservable.take(1).publish();
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
20
répondu paulpdaniels 2015-05-29 02:06:13

si le second observable est chaud, il y a une autre façon faire pause / reprendre:

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Vous pouvez aussi utiliser la version avec tampon pausableBuffered conserver les données pendant la pause est activé.

4
répondu Anton 2015-12-17 19:13:01

Si vous voulez vous assurer que l'ordre d'exécution est retenu, vous pouvez utiliser flatMap comme dans l'exemple suivant

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

Le résultat:

"1"
"11"
"111"
"finished"
2
répondu Entrodus 2017-10-20 09:32:05

Voici encore une autre possibilité de profiter du sélecteur de résultats switchMap

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });
2
répondu Joe King 2017-11-07 14:51:35

En voici encore un autre, mais je me sens plus simple et intuitif (ou du moins naturel si vous êtes habitué aux promesses), approche. Fondamentalement, vous créez un Observable en utilisant Observable.create() pour envelopper one et two comme un seul Observable. Ceci est très similaire à la façon dont Promise.all() peut fonctionner.

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

Donc, ce qui se passe ici? Tout d'abord, nous créons un nouveau Observable. La fonction transmise à , est passé à l'observateur (construit à partir des paramètres que vous passez à subscribe()), qui est similaire à resolve et reject combinés en un seul objet lors de la création d'une nouvelle Promesse. C'est la façon dont nous faisons de la magie de travail.

onSubscription, nous souscrivons au premier Observable (dans l'exemple ci-dessus, cela s'appelle one). La façon dont nous traitons next et error dépend de vous, mais la valeur par défaut fournie dans mon exemple devrait être appropriée en général. Cependant, quand nous recevons l' complete événement, qui signifie one est maintenant fait, nous pouvons abonnez-vous au prochain Observable; de ce fait, tirez le deuxième Observable après le premier est complet.

l'exemple d'observateur fourni pour le second Observable est assez simple. Fondamentalement, second agit maintenant comme vous vous y attendriez two agir comme dans L'OP. Plus précisément,second émettent de la première et seule la première valeur émise par someOtherObservable (à cause de take(1)) et ensuite complète, en supposant qu'il n'y a pas erreur.

Exemple

Voici une, de travail exemple, vous pouvez copier/coller si vous voulez voir mon exemple dans la vie réelle:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

Si vous regardez la console, l'exemple ci-dessus affichera:

1

6

Fait!

1
répondu c1moore 2018-02-15 03:29:13

skipUntil() avec last()

skipUntil : ignorer les éléments émis jusqu'à ce qu'un autre observable ait émis

dernière: émettre de la dernière valeur à partir d'une séquence

notez que l'observable est passé à skipUntil vient d'émettre quoi que ce soit d'annuler le saut à la corde, qui est pourquoi nous avons besoin de mettre en dernier().

main$.skipUntil(sequence2$.pipe(last()))

Officiel: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil

1
répondu Simon_Weaver 2018-08-29 21:10:10
export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

et vous pouvez l'utiliser comme n'importe quel opérateur:

var two = someOtherObservable.pipe(waitFor(one), take(1));

c'est essentiellement un opérateur qui reporte l'abonnement sur la source observable jusqu'à ce que le signal observable émette le premier événement.

0
répondu Andrew 2018-09-18 07:30:58

Vous pouvez utiliser le résultat émis par L'Observable précédent grâce à mergeMap (ou son alias flatMap) opérateur comme ceci:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))
0
répondu Tktorza 2018-09-18 09:37:12