Attendre une opération asynchrone au début de RxJS Observable

j'ai une séquence RxJS consommée de la manière normale...

cependant, dans le handler "onNext" observable, certaines opérations seront effectuées de manière synchrone, mais d'autres nécessitent des callbacks async, qui doivent être attendu avant de traiter l'élément suivant dans la séquence d'entrée.

...peu confus comment faire cela. Des idées? merci!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);
25
demandé sur Brandon 2014-02-19 13:32:59

3 réponses

chaque opération que vous voulez effectuer peut être modélisée comme observable. Même le fonctionnement synchrone peut être modélisé de cette façon. Ensuite, vous pouvez utiliser map pour convertir votre séquence dans une séquence de séquences, puis utilisez concatAll pour aplatir la séquence.

someObservable
    .map(function (item) {
        if (item === "do-something-async") {
            // create an Observable that will do the async action when it is subscribed
            // return Rx.Observable.timer(5000);

            // or maybe an ajax call?  Use `defer` so that the call does not
            // start until concatAll() actually subscribes.
            return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
        }
        else {
            // do something synchronous but model it as an async operation (using Observable.return)
            // Use defer so that the sync operation is not carried out until
            // concatAll() reaches this item.
            return Rx.Observable.defer(function () {
                return Rx.Observable.return(someSyncAction(item));
            });
        }
    })
    .concatAll() // consume each inner observable in sequence
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

pour répondre à certains de vos commentaires...à un moment donné, vous devez forcer certaines attentes sur le flux de fonctions. Dans la plupart des langues, lorsqu'il s'agit de fonctions qui sont peut-être asynchrones, les signatures de la fonction sont async et la nature async vs sync réelle de la fonction est cachée comme un détail d'implémentation de la fonction. C'est vrai si vous utilisez javaScript promesses, Rx observables, c# Tâches, c++ contrats à Terme, etc. Les fonctions finissent par retourner une promesse / observable/tâche/futur / etc et si la fonction est réellement synchrone, alors l'objet qu'elle retourne est juste déjà terminé.

cela dit, Puisque C'est du JavaScript, vous cheat:

var makeObservable = function (func) {
    return Rx.Observable.defer(function () {
        // execute the function and then examine the returned value.
        // if the returned value is *not* an Rx.Observable, then
        // wrap it using Observable.return
        var result = func();
        return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
    });
}

someObservable
    .map(makeObservable)
    .concatAll()
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });
23
répondu Brandon 2014-02-19 13:49:35

tout d'Abord, déplacez vos opérations asynchrones de subscribe, il n'est pas fait pour des opérations asynchrones.

Ce que vous pouvez utiliser est mergeMap (alias flatMap) ou concatMap. Je les cite tous les deux, parce que concatMap est en fait mergeMapconcurrent paramètre défini à 1. C'est utile, parfois, vous voulez limiter le nombre de requêtes simultanées, mais encore exécuter un couple simultanées.

source.concatMap(item => {
  if (item == 'do-something-async-and-wait-for-completion') {
    return Rx.Observable.timer(5000)
      .mapTo(item)
      .do(e => console.log('okay, we can continue'));
    } else {
      // do something synchronously and keep on going immediately
      return Rx.Observable.of(item)
        .do(e => console.log('ready to go!!!'));
    }
}).subscribe();

je vais aussi montrer comment vous pouvez limiter vos appels. conseil: limite de débit seulement au point où vous en avez réellement besoin, comme lors de l'appel D'une API externe qui n'autorise qu'un certain nombre de requêtes par seconde ou par minute. Sinon il est préférable de limiter le nombre d'opérations simultanées et de laisser le système se déplace à la vitesse maximale.

On commence avec le fragment de code suivant:

const concurrent;
const delay;
source.mergeMap(item =>
  selector(item, delay)
, concurrent)

Ensuite, nous devons choisir les valeurs de concurrent, delay et de mettre en œuvre selector. concurrent et delay sont étroitement liés. Par exemple, si nous voulons exécuter 10 articles par seconde, nous pouvons utiliser concurrent = 10 et delay = 1000 (milliseconde), mais aussi concurrent = 5 et delay = 500 ou concurrent = 4 et delay = 400. Le nombre d'éléments par seconde sera toujours concurrent / (delay / 1000).

maintenant, implémentons selector. Nous avons un couple d'options. Nous pouvons définir un temps d'exécution minimal pour selector, nous pouvons ajouter un retard constant, nous pouvons émettre les résultats dès comme ils sont disponibles, nous pouvons peut émettre le résultat qu'après le délai minimum est passé etc. Il est même possible d'ajouter un timeout en utilisant le timeout les opérateurs. Commodité.

définir l'heure minimale, envoyer le résultat tôt:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .merge(Rx.Observable.timer(delay).ignoreElements())
}

Définir un minimum de temps, envoyer le résultat de la fin:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .zip(Rx.Observable.timer(delay), (item, _))
}

Ajout de l'heure, envoyer le résultat à l'avance:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .concat(Rx.Observable.timer(delay).ignoreElements())
}

Ajout de l'heure, envoyer le résultat de la fin:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .delay(delay)
}
4
répondu Dorus 2016-07-28 17:01:55

un autre exemple simple pour faire des opérations asynchrones manuelles.

soyez conscient que ce n'est pas une bonne pratique réactive ! Si vous voulez seulement attendre 1000ms, utilisez Rx.Observable.la minuterie ou le retard de l'opérateur.

someObservable.flatMap(response => {
  return Rx.Observable.create(observer => {
    setTimeout(() => {
      observer.next('the returned value')
      observer.complete()
    }, 1000)
  })
}).subscribe()

maintenant, remplacez setTimeout par votre fonction async, comme Image.onload ou fileReader.onload ...

0
répondu TeChn4K 2017-06-12 10:35:37