Différence entre les flux Java 8 et les observables RxJava

les flux Java 8 sont-ils similaires aux observables RxJava?

Java 8 stream définition:

Les Classes

dans le nouveau paquet java.util.stream fournissent une API Stream à soutenir les opérations de style fonctionnel sur les flux d'éléments.

110
demandé sur Tagir Valeev 2015-05-13 16:54:00

7 réponses

TL;DR : tous les libs de traitement de séquences/flux offrent des API très similaires pour la construction de pipelines. Les différences sont dans L'API pour la manipulation multi-filetage et la composition des pipelines.

RxJava est très différent de Stream. De toutes les choses JDK, le plus proche de rx.Observable est peut-être java.util.flux.Collector Stream + CompletableFuture combo (qui vient à un coût de faire face à la couche supplémentaire monad, I. E. devant gérer la conversion entre Stream<CompletableFuture<T>> et CompletableFuture<Stream<T>> ).

Il y a des différences significatives entre les Observables et Stream:

  • les Flux sont basé sur l'extraction, Observables sont de type push. Cela peut paraître abstrait, mais il a des conséquences importantes qui sont très concrètes.
  • cours d'eau peut être utilisé une seule fois, Observables peuvent être abonné à de nombreuses reprises
  • Stream#parallel() la séquence de divisions en partitions, Observable#subscribeOn() et Observable#observeOn() ne le font pas; il est difficile d'émuler le comportement Stream#parallel() avec Observable, il avait une fois la méthode .parallel() mais cette méthode a causé tellement de confusion que le support .parallel() a été déplacé au dépôt séparé sur github, RxJavaParallel. Plus de détails sont dans une autre réponse .
  • Stream#parallel() ne permet pas de spécifier un pool de thread à utiliser, contrairement à la plupart des méthodes RxJava accepter planificateur optionnel. Depuis tous les instances de flux dans une JVM utilisent le même fork-join pool, l'ajout de .parallel() peut accidentellement affecter le comportement dans un autre module de votre programme
  • les Flux sont en manque de temps liées à des opérations comme Observable#interval() , Observable#window() et beaucoup d'autres; c'est surtout parce que les Flux sont basé sur l'extraction
  • Flux offre ensemble restreint d'opérations en comparaison avec RxJava. E. g. Les flux sont absence d'opérations de coupure ( takeWhile() , takeUntil() ); la solution de contournement en utilisant Stream#anyMatch() est limitée: c'est une opération de terminal, donc vous ne pouvez pas l'utiliser plus d'une fois par flux
  • à partir de JDK 8, Il n'y a pas de flux # zip opération, qui est très utile parfois
  • Flux sont difficiles à construire par vous-même, Observable peut être construit de plusieurs façons différentes EDIT: , Comme indiqué dans les commentaires, il y a des façons de construire des Flux. Cependant, puisqu'il n'y a pas de court-circuit non-terminal, vous ne pouvez pas E. G. génère facilement un flux de lignes dans le fichier (JDK fournit des#lignes de fichiers et#lignes de BufferedReader hors de la boîte cependant, et d'autres scénarios similaires peuvent être gérés en construisant le flux à partir de Iterator).
  • Observable offers resource management facility ( Observable#using() ); vous pouvez envelopper io stream ou mutex avec elle et être sûr que l'utilisateur n'oubliera pas de libérer la ressource - il sera disposition automatique à la fin de l'abonnement; Stream a la méthode onClose(Runnable) , mais vous devez l'appeler manuellement ou via try-with-resources. Par exemple: vous devez garder à l'esprit que les fichiers#lines() doivent être inclus dans le bloc "try-with-resources".
  • Observables sont synchronisés tout le chemin à travers (je n'ai pas réellement vérifier si la même chose est vraie pour les flux). Cela vous épargne de penser si les opérations de base sont thread-safe (la réponse est toujours "oui", sauf s'il y a un bug), mais les frais généraux liés à la concurrence seront là, peu importe si votre code en a besoin ou non.

Round-up: RxJava diffère des cours d'eau de manière significative. Les alternatives réelles de RxJava sont d'autres implémentations de ReactiveStreams , E. G. partie pertinente de Akka.

mise à Jour . Il y a un truc pour utiliser la non-default fork-join pool pour Stream#parallel , voir Personnalisée pool de threads en Java 8 courant parallèle

mise à Jour . Tout ce qui précède est basé sur L'expérience de RxJava 1.x. Maintenant que RxJava 2.x est ici , cette réponse peut être périmée.

117
répondu Kirill Gamazkov 2017-11-30 19:05:27

Java 8 Stream et RxJava semblent assez similaires. Ils ont des opérateurs semblables (filtre, carte, flatMap...) mais ne sont pas construits pour le même usage.

vous pouvez effectuer des tâches asynchrones en utilisant RxJava.

avec Java 8 stream, vous allez parcourir les articles de votre collection.

vous pouvez faire à peu près la même chose dans RxJava (traverser les articles d'une collection), mais, comme RxJava se concentre sur la tâche concurrente,... il utiliser synchronisation, verrouillage,... Ainsi, la même tâche utilisant RxJava peut être plus lente qu'avec Java 8 stream.

RxJava peut être comparé à CompletableFuture , mais qui peut être en mesure de calculer plus d'une seule valeur.

46
répondu dwursteisen 2015-05-13 14:38:01

il y a quelques différences techniques et conceptuelles, par exemple, les flux Java 8 sont des séquences de valeurs à usage unique, basées sur la pull, synchrones, tandis que les Observables RxJava sont ré-observables, basées sur la push-pull adaptative, potentiellement asynchrones. RxJava est destiné à Java 6+ et fonctionne aussi sur Android.

35
répondu akarnokd 2015-05-13 14:51:06

Java 8, les Flux sont basé sur du pull. Vous itérez sur un flux Java 8 consommant chaque élément. Et il pourrait être un ruisseau sans fin.

RXJava Observable est par défaut le pousser fondé. Vous vous abonnez à un Observable et vous recevrez un avis lorsque le prochain article arrive ( onNext ), ou lorsque le flux est terminé ( onCompleted ), ou lorsqu'une erreur s'est produite ( onError ). Parce qu'avec Observable vous recevez onNext , onCompleted , onError événements, vous peut faire quelques fonctions puissantes comme la combinaison de différentes Observable à un nouveau ( zip , merge , concat ). Vous pourriez aussi le mettre en cache, l'étrangler ... Et il utilise plus ou moins la même API dans différents langages (RxJava, RX dans C#, RxJS, ...)

par défaut RxJava est un filetage simple. À moins que vous ne commenciez à utiliser Scheduleders, tout se passera sur le même thread.

28
répondu Bart De Neuter 2017-11-01 13:56:11

Les réponses sont complètes et correctes, mais un exemple clair pour les débutants est absente. Permettez-moi de mettre certains de béton derrière des termes comme "push/pull" et "re-observables". Note : je déteste le terme Observable (c'est un stream pour l'amour du ciel), donc je vais simplement faire référence à J8 vs RX streams.

tenir compte d'une liste si entiers,

digits = [1,2,3,4,5]

un flux J8 est un utilitaire pour modifier la collection. Pour exemple les chiffres pairs peuvent être extraits comme,

evens = digits.stream().filter(x -> x%2)

il s'agit essentiellement de la carte de Python, filtre, réduit , un ajout très agréable (et attendu depuis longtemps) à Java. Mais si les chiffres n'étaient pas stockés à l'avance? Peut-on encore filtrer les même chiffres?

imaginez qu'un processus de thread séparé produit des entiers au hasard ( --- indique le temps)

digits = 12345---6------7--8--9-10--------11--12

En RX, even peut réagir à chaque nouveau chiffre et appliquer le filtre en temps réel

even = -2-4-----6---------8----10------------12

il n'est pas nécessaire de stocker des listes d'entrées et de sorties. Si vous voulez une liste de sortie, aucun problème qui est streamable aussi. En fait, tout est un ruisseau.

evens_stored = even.collect()  

C'est pourquoi des termes comme "apatride" et "fonctionnel" sont plus associés à RX

16
répondu Adam Hughes 2018-05-06 18:08:07

RxJava est également étroitement lié à reactive streams initiative et considère comme une simple mise en œuvre de l'API de flux réactifs (par exemple par rapport à Akka streams implementation ). La principale différence est que les flux réactifs sont conçus pour être en mesure de gérer la contre-pression, mais si vous avez un coup d'oeil à la page Flux réactifs, vous aurez l'idée. Ils décrivent assez bien leurs objectifs et les cours d'eau sont également étroitement lié à le manifeste réactif .

les flux Java 8 sont à peu près l'implémentation d'une collection illimitée, assez similaire au Scala Stream ou au Clojure seq lazy .

4
répondu Niclas Meier 2017-05-23 12:34:36

Java 8 Streams permettent le traitement efficace de collections vraiment grandes, tout en tirant parti des architectures multicore. En revanche, RxJava est mono-threadé par défaut (sans Schedulers). Donc RxJava ne va pas profiter des machines multi-core à moins que vous codez cette logique vous-même.

2
répondu Igor Ganapolsky 2016-03-08 14:00:46