Rx: exécuter les Observables zippés en parallèle?
donc je joue avec RX (really cool), et j'ai converti mon api qui accède à une base de données sqlite sous Android pour retourner les observables.
donc naturellement un des problèmes que j'ai commencé à essayer de résoudre est, "que faire si je veux faire 3 appels API, obtenir les résultats, et puis faire un peu de traitement une fois qu'ils sont tous finis?"
cela m'a pris une heure ou deux, mais j'ai finalement trouvé le Fonctionnalité Zip et il m'aide haut la main:
Observable<Integer> one = getNumberedObservable(1);
Observable<Integer> two = getNumberedObservable(2);
Observable<Integer> three = getNumberedObservable(3);
Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1, Integer arg2) {
System.out.println("Zip0: " + arg0);
System.out.println("Zip1: " + arg1);
System.out.println("Zip2: " + arg2);
return arg0 + arg1 + arg2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer arg0) {
System.out.println("Zipped Result: " + arg0);
}
});
public static Observable<Integer> getNumberedObservable(final int value) {
return Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}
});
}
Super! Donc c'est cool.
donc quand je ferme les 3 observables, ils tournent en série. Et si je veux qu'ils fonctionnent tous en parallèle en même temps pour que j'obtienne les résultats plus rapidement? j'ai joué avec quelques trucs, et j'ai même essayé de lire certains des trucs RX originaux que les gens ont écrit en C#. Je suis sûr qu'il y a une réponse simple. Quelqu'un peut me pointer dans la bonne direction? Quelle est la bonne façon de le faire?
3 réponses
zip
exécutez les observables en parallèle - mais il aussi souscrit pour eux serieusement. Depuis votre getNumberedObservable
complète dans la méthode d'Abonnement il donne le impression de courir en série, mais il n'y a en fait aucune limitation de ce type.
vous pouvez soit essayer avec des Observables de longue durée qui survivent à leur logique d'abonnement, comme timer
, ou subscribeOn
méthode pour s'abonner de façon asynchrone à chaque flux transmis à zip
.
Dans RxJava, utilisez toAsync pour transformer une fonction régulière en quelque chose qui s'exécute sur un thread et retourner son résultat dans un observable.
je ne connais pas la syntaxe Java, mais il ressemblerait à quelque chose comme:
public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
return rx.util.functions.toAsync(new Func<Integer,Integer>() {
@Override
public Integer call(Integer value) { return getNumber(value); }
});
};
si getNumber
accédaient vraiment à une base de données. Lorsque vous appelez getNumberedObservable
il renvoie un observable qui s'exécute getNumber
sur un fil séparé lorsque vous vous y abonnez.
j'essayais de faire la même chose, en lançant plusieurs threads en parallèle en utilisant le zip. J'ai fini l'ouverture d'un nouveau donc, la question et obtenu une réponse. Fondamentalement, vous devez souscrire chaque observable à un nouveau thread, donc si vous voulez exécuter trois observables en parallèle en utilisant le zip, vous devez vous abonner à 3 threads séparés.