Comment puis-je faire en sorte qu'une séquence Observable RxJS attende qu'une autre se termine avant d'émettre?
Dire que j'ai un Observables, comme suit:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Ensuite, j'ai un deuxième observables:
var two = someOtherObservable.take(1);
Maintenant, je veux m'abonner two
, mais je veux faire en sorte que one
a terminé avant la balise two
abonné est déclenché. Quelle méthode de mise en tampon puis-je utiliser sur two
faire attendre le second pour que le premier soit terminé?
je suppose que je suis à la recherche pour mettre en pause two
jusqu'à one
est complète.
8 réponses
Un couple de façons que je peux penser de
//Method one
var one = someObservable.take(1);
var two = someOtherObservable.take(1);
one.concat(two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.take(1);
var two = someOtherObservable.take(1).publish();
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
si le second observable est chaud, il y a une autre façon faire pause / reprendre:
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
Vous pouvez aussi utiliser la version avec tampon pausableBuffered conserver les données pendant la pause est activé.
Si vous voulez vous assurer que l'ordre d'exécution est retenu, vous pouvez utiliser flatMap comme dans l'exemple suivant
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
Le résultat:
"1"
"11"
"111"
"finished"
Voici encore une autre possibilité de profiter du sélecteur de résultats switchMap
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
En voici encore un autre, mais je me sens plus simple et intuitif (ou du moins naturel si vous êtes habitué aux promesses), approche. Fondamentalement, vous créez un Observable en utilisant Observable.create()
pour envelopper one
et two
comme un seul Observable. Ceci est très similaire à la façon dont Promise.all()
peut fonctionner.
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});
Donc, ce qui se passe ici? Tout d'abord, nous créons un nouveau Observable. La fonction transmise à , est passé à l'observateur (construit à partir des paramètres que vous passez à subscribe()
), qui est similaire à resolve
et reject
combinés en un seul objet lors de la création d'une nouvelle Promesse. C'est la façon dont nous faisons de la magie de travail.
onSubscription
, nous souscrivons au premier Observable (dans l'exemple ci-dessus, cela s'appelle one
). La façon dont nous traitons next
et error
dépend de vous, mais la valeur par défaut fournie dans mon exemple devrait être appropriée en général. Cependant, quand nous recevons l' complete
événement, qui signifie one
est maintenant fait, nous pouvons abonnez-vous au prochain Observable; de ce fait, tirez le deuxième Observable après le premier est complet.
l'exemple d'observateur fourni pour le second Observable est assez simple. Fondamentalement, second
agit maintenant comme vous vous y attendriez two
agir comme dans L'OP. Plus précisément,second
émettent de la première et seule la première valeur émise par someOtherObservable
(à cause de take(1)
) et ensuite complète, en supposant qu'il n'y a pas erreur.
Exemple
Voici une, de travail exemple, vous pouvez copier/coller si vous voulez voir mon exemple dans la vie réelle:
var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);
Si vous regardez la console, l'exemple ci-dessus affichera:
1
6
Fait!
skipUntil() avec last()
skipUntil : ignorer les éléments émis jusqu'à ce qu'un autre observable ait émis
dernière: émettre de la dernière valeur à partir d'une séquence
notez que l'observable est passé à skipUntil
vient d'émettre quoi que ce soit d'annuler le saut à la corde, qui est pourquoi nous avons besoin de mettre en dernier().
main$.skipUntil(sequence2$.pipe(last()))
Officiel: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
export function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) =>
new Observable<T>(observer =>
signal.pipe(first())
.subscribe(_ =>
source.subscribe(observer)
)
);
}
et vous pouvez l'utiliser comme n'importe quel opérateur:
var two = someOtherObservable.pipe(waitFor(one), take(1));
c'est essentiellement un opérateur qui reporte l'abonnement sur la source observable jusqu'à ce que le signal observable émette le premier événement.
Vous pouvez utiliser le résultat émis par L'Observable précédent grâce à mergeMap (ou son alias flatMap) opérateur comme ceci:
const one = Observable.of('https://api.github.com/users');
const two = (c) => ajax(c);//ajax from Rxjs/dom library
one.mergeMap(two).subscribe(c => console.log(c))