La compression de flux à l'aide JDK8 avec lambda (java.util.flux.Flux.zip)

dans JDK 8 avec lambda b93 il y avait une classe java.util.flux.Flux.zip en b93 qui pourrait être utilisé pour zip flux (ce qui est illustré dans le tutoriel Explorer Java8 Lambdas. Partie 1 par Dhananjay Nene ). Cette fonction:

crée un flux combiné paresseux et séquentiel dont les éléments sont résultat de la combinaison des éléments de deux flux.

cependant dans b98 cela a disparu. En fait, la classe Streams n'est même pas accessible en java.util.flux dans b98 .

cette fonctionnalité a-t-elle été déplacée et, dans l'affirmative, comment dois-je fermer les flux de données de façon concise en utilisant b98?

L'application que j'ai à l'esprit est dans cette implémentation java de Shen , où j'ai remplacé la fonctionnalité zip dans le

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

fonctions avec un code plutôt verbeux (qui n'utilise pas de fonctionnalité de b98).

114
demandé sur Holger 0000-00-00 00:00:00

13 réponses

j'en avais besoin aussi, alors j'ai pris le code source de b93 et je l'ai mis dans une classe" util". J'ai dû le modifier légèrement pour travailler avec L'API actuelle.

pour référence voici le code de travail (prenez-le à vos risques et périls...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
61
répondu siki 2016-08-11 14:43:13

zip est l'une des fonctions fournies par le protonpack bibliothèque .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
38
répondu Dominic Fox 2014-10-15 14:10:46

si vous avez de la goyave dans votre projet, vous pouvez utiliser les flux .méthode zip (a été ajoutée dans la goyave 21):

renvoie un ruisseau dans lequel chaque élément est le résultat du passage de l'élément correspondant de chacun de streamA et stream à la fonction. Le flux résultant sera seulement aussi longtemps que la plus courte des deux flux d'entrée; si un flux est plus, ses éléments supplémentaires seront ignorés. Le flux résultant est pas efficace splittable. Cela peut nuire aux performances parallèles.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
22
répondu ZhekaKozlov 2017-04-16 17:56:39

Compression des deux cours d'eau à l'aide de JDK8 avec lambda ( gist ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
20
répondu Karol Król 2015-09-02 20:45:54

comme je ne peux concevoir aucune utilisation de fermeture éclair sur les collections autres que celles indexées (listes) et que je suis un grand fan de simplicité, ce serait ma solution:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObject( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
10
répondu Rafael 2017-03-14 18:58:09

les méthodes de la classe que vous avez mentionnée ont été déplacées à l'interface Stream elle-même en faveur des méthodes par défaut. Mais il semble que la méthode zip ait été supprimée. Peut-être parce qu'il n'est pas clair ce que le comportement par défaut pour les différents flux de taille devrait être. Mais mettre en œuvre le comportement désiré est simple:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
9
répondu Holger 2013-11-15 11:57:55

la bibliothèque Lazy-Seq offre la fonctionnalité zip.

https://github.com/nurkiewicz/LazySeq

cette bibliothèque est fortement inspirée de scala.collection.immutable.Stream et vise à fournir immuable, thread-safe et facile à utiliser la mise en œuvre de séquence paresseuse, éventuellement infinie.

6
répondu Nick Siderakis 2013-10-22 20:33:02
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
1
répondu robby_pelssers 2015-08-07 15:14:03

aol's cyclops-react , à laquelle je contribue, fournit également une fonctionnalité de fermeture éclair, à la fois via un extended Stream implementation , qui met également en œuvre L'interface reactive-streams ReactiveSeq, et via StreamUtils qui offre une grande partie de la même fonctionnalité via des méthodes statiques pour les flux Java standard.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

il offre également plus généralisée application fondée fermeture à glissière. Par exemple:

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

et même la possibilité de jumeler chaque article d'un flux avec chaque article d'un autre

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
1
répondu John McClean 2016-02-24 17:07:37

c'est génial. J'ai dû zip deux flux dans une carte avec un flux étant la clé et l'autre étant la valeur

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

sortie: {A=Pomme, B=Banane, C = Carotte}

0
répondu Gnana 2016-02-25 17:17:21

je suggère humblement cette mise en œuvre. Le flux résultant est tronqué au plus court des deux flux entrants.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
0
répondu Doradus 2017-09-15 01:05:59

si quelqu'un a encore besoin de cela, il y a la fonction StreamEx.zipWith dans streamex bibliothèque:

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
0
répondu const.grigoryev 2018-07-25 16:46:13

en utilisant la dernière bibliothèque Guava (pour la classe Streams ) vous devriez être en mesure de faire

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
0
répondu Dan Borza 2018-09-10 21:24:22