Comment fonctionne l'opérateur RxJS 5 share ()?

ce N'est pas 100% clair pour moi comment l'opérateur RxJs 5 share() fonctionne, voir ici le derniers docs . Jsbin pour la question ici .

si je crée un observable avec une série de 0 à 2, chaque valeur séparée par une seconde:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

et si je crée deux abonnés à cette observable:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

, j'obtiens ceci dans la console:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

je pensais que chaque abonnement souscrirait à la même Observable, mais il ne semble pas être le cas! C'est comme l'acte de souscrire crée un Observable complètement séparé!

mais si l'opérateur share() est ajouté à la source observable:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

puis nous obtenons ceci:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

, ce qui est normal sans le share() .

Whats comment fonctionne l'opérateur share() ? Chaque abonnement crée-t-il une nouvelle chaîne Observable?

22
demandé sur user3743222 2016-02-02 01:34:43

2 réponses

soyez prudent que vous utilisez RxJS v5 alors que votre lien de documentation semble être RxJS v4. Je ne me souviens pas des détails, mais je pense que l'opérateur share a subi quelques changements, en particulier en ce qui concerne l'achèvement et la réinscription, mais ne me croyez pas sur parole.

retour à votre question, comme vous l'avez montré dans votre étude, vos attentes ne correspondent pas à la conception de la bibliothèque. Observables instancient paresseusement leur flux de données, concrètement initier les flux de données lorsqu'un abonné souscrit. Lorsqu'un second abonné souscrit au même observable, un autre nouveau flux de données est lancé comme s'il s'agissait du premier abonné (donc oui, chaque abonnement crée une nouvelle chaîne d'observables comme vous l'avez dit). C'est ce qui est appelé dans la terminologie de RxJS comme un observable à froid et c'est le comportement par défaut pour RxJS observable. Si vous voulez un observable qui envoie ses données aux abonnés qu'il a au moment où les données arrivent, ceci est appelé observable à chaud, et une façon d'obtenir un observable à chaud est d'utiliser l'opérateur share .

Vous pouvez trouver illustré d'abonnement et les flux de données ici : Chaud et Froid observables : il y a "chaud" et "froid" opérateurs? (valable pour RxJS v4, mais la plupart est valable pour v5).

16
répondu user3743222 2017-05-23 12:26:14

part rend l'observable "chaud" si ces 2 conditions sont remplies:

  1. le nombre d'abonnés > 0
  2. et l'observable n'a pas terminé

scénario 1: Nombre d'abonnés > 0 et observable n'est pas terminé avant un nouvel abonnement

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

scénario 2: le nombre d'abonnés est zéro avant un nouvel abonnement. Devient "froid

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds

scénario 3: quand observable a été terminé avant un nouvel abonnement. Devient "froid

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
11
répondu sbr 2016-11-19 23:04:50