Rx: exécuter les Observables zippés en parallèle?

donc je joue avec RX (really cool), et j'ai converti mon api qui accède à une base de données sqlite sous Android pour retourner les observables.

donc naturellement un des problèmes que j'ai commencé à essayer de résoudre est, "que faire si je veux faire 3 appels API, obtenir les résultats, et puis faire un peu de traitement une fois qu'ils sont tous finis?"

cela m'a pris une heure ou deux, mais j'ai finalement trouvé le Fonctionnalité Zip et il m'aide haut la main:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Super! Donc c'est cool.

donc quand je ferme les 3 observables, ils tournent en série. Et si je veux qu'ils fonctionnent tous en parallèle en même temps pour que j'obtienne les résultats plus rapidement? j'ai joué avec quelques trucs, et j'ai même essayé de lire certains des trucs RX originaux que les gens ont écrit en C#. Je suis sûr qu'il y a une réponse simple. Quelqu'un peut me pointer dans la bonne direction? Quelle est la bonne façon de le faire?

15
demandé sur spierce7 2014-01-18 10:52:45

3 réponses

zip exécutez les observables en parallèle - mais il aussi souscrit pour eux serieusement. Depuis votre getNumberedObservable complète dans la méthode d'Abonnement il donne le impression de courir en série, mais il n'y a en fait aucune limitation de ce type.

vous pouvez soit essayer avec des Observables de longue durée qui survivent à leur logique d'abonnement, comme timer, ou subscribeOn méthode pour s'abonner de façon asynchrone à chaque flux transmis à zip.

17
répondu James World 2014-01-18 18:52:28

Dans RxJava, utilisez toAsync pour transformer une fonction régulière en quelque chose qui s'exécute sur un thread et retourner son résultat dans un observable.

je ne connais pas la syntaxe Java, mais il ressemblerait à quelque chose comme:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

si getNumber accédaient vraiment à une base de données. Lorsque vous appelez getNumberedObservable il renvoie un observable qui s'exécute getNumber sur un fil séparé lorsque vous vous y abonnez.

5
répondu Brandon 2014-01-20 02:39:08

j'essayais de faire la même chose, en lançant plusieurs threads en parallèle en utilisant le zip. J'ai fini l'ouverture d'un nouveau donc, la question et obtenu une réponse. Fondamentalement, vous devez souscrire chaque observable à un nouveau thread, donc si vous voulez exécuter trois observables en parallèle en utilisant le zip, vous devez vous abonner à 3 threads séparés.

2
répondu s-hunter 2017-05-23 12:34:21