Pouvez-vous diviser un flux en deux flux?

J'ai un ensemble de données représenté par un flux Java 8:

Stream<T> stream = ...;

Je peux voir comment filtrer pour obtenir un sous-ensemble aléatoire - par exemple

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

Je peux aussi voir comment je pourrais réduire ce flux pour obtenir, par exemple, deux listes représentant deux moitiés aléatoires de l'ensemble de données, puis les transformer en flux. Mais existe-t-il un moyen direct de générer deux flux à partir du flux initial? Quelque chose comme

(heads, tails) = stream.[some kind of split based on filter]

Merci pour tout aperçu.

98
demandé sur Stuart Marks 2013-11-13 01:31:19

9 réponses

Pas exactement. Vous ne pouvez pas obtenir deux Streams d'un; cela n'a pas de sens-comment iriez-vous sur l'un sans avoir besoin de générer l'autre en même temps? Un flux ne peut être exploité qu'une seule fois.

Cependant, si vous souhaitez vider dans une liste ou quelque chose, vous pourriez faire

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
9
répondu Louis Wasserman 2013-11-12 21:38:08

Un collecteur peut être utilisé pour cela.

  • pour deux catégories, utilisez Collectors.partitioningBy() usine.

Cela va créer un Map de Boolean à List, et mettre des éléments dans l'une ou l'autre liste basée sur un Predicate.

Note: puisque le flux doit être consommé entier, cela ne peut pas fonctionner sur des flux infinis. Parce que le flux est consommé de toute façon, cette méthode les place simplement dans des listes au lieu de créer un nouveau flux avec de la mémoire.

Aussi, pas besoin de la itérateur, même pas dans les têtes-seul exemple que vous avez fourni.

Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • pour plus de catégories, utilisez une usine Collectors.groupingBy().
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

Dans le cas où les flux ne sont pas Stream, mais l'un des flux primitifs comme IntStream, alors cette méthode .collect(Collectors) n'est pas disponible. Vous devrez le faire de manière manuelle sans une usine de collecteur. Son implémentation ressemble à ceci:

IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000);

Predicate<Integer> p = x -> r.nextBoolean();
Map<Boolean, List<Integer>> groups = intStream.collect(() -> {
    Map<Boolean, List<Integer>> map = new HashMap<>();
    map.put(false, new ArrayList<>());
    map.put(true, new ArrayList<>());
    return map;
}, (map, x) -> {
    boolean partition = p.test(x);
    List<Integer> list = map.get(partition);
    list.add(x);
}, (map1, map2) -> {
    map1.get(false).addAll(map2.get(false));
    map1.get(true).addAll(map2.get(true));
});

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());

Modifier

Comme indiqué, la 'solution de contournement' ci-dessus n'est pas sûre pour les threads. La conversion en Stream normal avant la collecte est la voie à suivre:

Stream<Integer> stream = intStream.boxed();
201
répondu Mark Jeronimus 2015-07-07 19:30:52

Malheureusement, ce que vous demandez est directement mal vu dans le JavaDoc du flux :

Un flux doit être exploité (en invoquant un intermédiaire ou un terminal fonctionnement du flux) une seule fois. Cela exclut, par exemple, "en fourche" flux, lorsque la même source alimente deux pipelines ou plus, ou plusieurs traversées du même flux.

Vous pouvez contourner cela en utilisant peek ou d'autres méthodes si vous désirez vraiment, ce type de comportement. Dans ce cas, ce que vous devriez faire est au lieu d'essayer de sauvegarder deux flux de la même source de flux d'origine avec un filtre de forking, vous dupliquez votre flux et filtrez chacun des doublons de manière appropriée.

Cependant, vous voudrez peut-être reconsidérer si un Stream est la structure appropriée pour votre cas d'utilisation.

16
répondu Trevor Freeman 2013-11-12 22:27:49

Je suis tombé sur cette question et je pense qu'un flux fourchu a des cas d'utilisation qui pourraient s'avérer valides. J'ai écrit le code ci-dessous en tant que consommateur afin qu'il ne fasse rien mais que vous puissiez l'appliquer aux fonctions et à tout ce que vous pourriez rencontrer.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Maintenant, votre implémentation de code pourrait être quelque chose comme ceci:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
9
répondu LudgerP 2018-06-27 13:16:05

C'est contre le mécanisme général de Flux. Dites que vous pouvez diviser le flux S0 à Sa et Sb comme vous le vouliez. Effectuer une opération de terminal, par exemple count(), sur Sa va nécessairement "consommer" tous les éléments dans S0. Sb a donc perdu sa source de données.

Auparavant, Stream avait une méthode tee(), je pense, qui dupliquait un flux à deux. Il est enlevé maintenant.

Stream a une méthode peek() cependant, vous pourriez être en mesure de l'utiliser pour répondre à vos besoins.

7
répondu ZhongYu 2013-11-12 21:40:39

Pas exactement, mais vous pourrez peut-être accomplir ce dont vous avez besoin en invoquant Collectors.groupingBy(). vous créez une nouvelle Collection et pouvez ensuite instancier des flux sur cette nouvelle collection.

5
répondu aepurniet 2013-11-13 18:33:28

C'était la moins mauvaise réponse que je pouvais venir.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

Cela prend un flux d'entiers et les divise à 5. Pour ceux qui sont supérieurs à 5, Il filtre uniquement les nombres pairs et les place dans une liste. Pour le reste, il les rejoint avec |.

Sorties:

 ([6, 8],0|1|2|3|4|5)

Ce n'est pas idéal car il rassemble tout dans des collections intermédiaires brisant le flux (et a trop d'arguments!)

1
répondu Ian Jones 2016-03-17 11:02:41

Je suis tombé sur cette question en cherchant un moyen de filtrer certains éléments d'un flux et de les enregistrer comme des erreurs. Donc, je n'avais pas vraiment besoin de diviser le flux au point d'attacher une action de fin prématurée à un prédicat avec une syntaxe discrète. C'est ce que je suis venu avec:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
1
répondu Sebastian Hans 2017-06-01 07:50:35

Que diriez-vous de:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
-2
répondu Matthew 2017-02-16 06:06:17