Java 8 flux, obtenir la tête et la queue

Java 8 a introduit une classe Stream qui ressemble à Stream de Scala, une puissante construction paresseuse utilisant laquelle il est possible de faire quelque chose comme ceci de manière très concise:

def from(n: Int): Stream[Int] = n #:: from(n+1)

def sieve(s: Stream[Int]): Stream[Int] = {
  s.head #:: sieve(s.tail filter (_ % s.head != 0))
}

val primes = sieve(from(2))

primes takeWhile(_ < 1000) print  // prints all primes less than 1000

Je me demandais s'il était possible de le faire en Java 8, alors j'ai écrit quelque chose comme ceci:

IntStream from(int n) {
    return IntStream.iterate(n, m -> m + 1);
}

IntStream sieve(IntStream s) {
    int head = s.findFirst().getAsInt();
    return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0)));
}

IntStream primes = sieve(from(2));

Assez simple, mais il produit java.lang.IllegalStateException: stream has already been operated upon or closed parce que findFirst() et skip() sont des opérations de terminal sur Stream qui ne peuvent être effectuées qu'une seule fois.

Je n'ai pas vraiment d'utiliser le flux deux fois puisque tout ce dont j'ai besoin est le premier nombre dans le flux et le reste comme un autre flux, c'est-à-dire équivalent de Stream.head et Stream.tail de Scala. Existe-t-il une méthode en Java 8 Stream que je peux utiliser pour y parvenir?

Merci.

21
demandé sur Stuart Marks 2013-11-06 06:37:41

8 réponses

Même si vous n'aviez pas le problème que vous ne pouvez pas diviser un IntStream, votre code n'a pas fonctionné parce que vous invoquez votre méthode sieve récursivement au lieu de paresseusement. Vous avez donc eu une récursivité infinie avant de pouvoir interroger votre flux résultant pour la première valeur.

Le Fractionnement IntStream s dans une tête et une queue IntStream (qui n'a pas encore consommé) est possible:

PrimitiveIterator.OfInt it = s.iterator();
int head = it.nextInt();
IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0);

À cet endroit, vous avez besoin d'une construction d'invoquer sieve sur la queue paresseusement. Stream ne le prévoit pas; concat attend les instances de flux existantes comme arguments et vous ne pouvez pas construire un flux appelant sieve paresseusement avec une expression lambda car la création paresseuse fonctionne avec un état mutable uniquement que les expressions lambda ne supportent pas. Si vous n'avez pas d'implémentation de bibliothèque masquant l'état mutable, vous devez utiliser un objet mutable. Mais une fois que vous acceptez l'exigence d'état mutable, la solution peut être encore plus facile que votre première approche:

IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0));

IntPredicate p = x -> true;

IntStream from(int n)
{
  return IntStream.iterate(n, m -> m + 1);
}

Cela créera récursivement un filtre mais dans le fin peu importe si vous créez un arbre de IntPredicate S ou un arbre de IntStream s (comme avec votre approche IntStream.concat si cela a fonctionné). Si vous n'aimez pas le champ d'instance mutable pour le filtre, vous pouvez le cacher dans une classe interne (mais pas dans une expression lambda...).

10
répondu Holger 2016-01-19 14:22:03

Vous pouvez essentiellement l'implémenter comme ceci:

static <T> Tuple2<Optional<T>, Seq<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it));
}

Dans l'exemple ci-dessus, Tuple2 et Seq sont des types empruntés à jOOλ, une bibliothèque que nous avons développée pour les tests d'intégration jOOQ. Si vous ne voulez pas de dépendances supplémentaires, vous pouvez aussi bien les implémenter vous-même:

class Tuple2<T1, T2> {
    final T1 v1;
    final T2 v2;

    Tuple2(T1 v1, T2 v2) {
        this.v1 = v1;
        this.v2 = v2;
    }

    static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
        return new Tuple<>(v1, v2);
    }
}

static <T> Tuple2<Optional<T>, Stream<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(
        it.hasNext() ? Optional.of(it.next()) : Optional.empty,
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            it, Spliterator.ORDERED
        ), false)
    );
}
2
répondu Lukas Eder 2014-09-22 14:28:46

La solution ci-dessous ne fait pas de mutations d'état, sauf pour la déconstruction tête/queue du flux.

Le lazyness est obtenu en utilisant IntStream.itérer. La classe Prime est utilisée pour conserver l'état du générateur

    import java.util.PrimitiveIterator;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;

    public class Prime {
        private final IntStream candidates;
        private final int current;

        private Prime(int current, IntStream candidates)
        {
            this.current = current;
            this.candidates = candidates;
        }

        private Prime next()
        {
            PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator();

            int head = it.next();
            IntStream tail = IntStream.generate(it::next);

            return new Prime(head, tail);
        }

        public static Stream<Integer> stream() {
            IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1);

            return Stream.iterate(new Prime(2, possiblePrimes), Prime::next)
                         .map(p -> p.current);
        }
    }

L'utilisation serait la suivante:

Stream<Integer> first10Primes = Prime.stream().limit(10)
2
répondu vidi 2015-09-17 15:20:19

Mon StreamEx bibliothèque a maintenant headTail() opération qui résout le problème:

public static StreamEx<Integer> sieve(StreamEx<Integer> input) {
    return input.headTail((head, tail) -> 
        sieve(tail.filter(n -> n % head != 0)).prepend(head));
}

La méthode headTail prend un {[14] } qui sera exécuté au plus une fois pendant l'exécution de l'opération de terminal de flux. Cette implémentation est donc paresseuse: elle ne calcule rien tant que la traversée ne démarre pas et ne calcule que les nombres premiers demandés. Le BiFunction reçoit un premier élément de flux head et le flux des éléments rest tail et peut modifier le tail de toute façon qu'il veut. Vous pouvez l'utiliser avec une entrée prédéfinie:

sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println);

Mais le flux infini fonctionne aussi

sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000)
     .forEach(System.out::println);
// Not the primes till 1000, but 1000 first primes
sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println);

Il existe également une solution alternative en utilisant headTail et la concaténation des prédicats:

public static StreamEx<Integer> sieve(StreamEx<Integer> input, IntPredicate isPrime) {
    return input.headTail((head, tail) -> isPrime.test(head) 
            ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head)
            : sieve(tail, isPrime));
}

sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println);

Il est intéressant de comparer les solutions récursives: combien de nombres premiers ils sont capables de générer.

@solution John McClean (StreamUtils)

Les solutions John McClean ne sont pas paresseuses: vous ne pouvez pas les nourrir avec un flux infini. Donc je viens de trouver par essai et erreur la limite supérieure maximale autorisée (17793) (après que StackOverflowError se produise):

public void sieveTest(){
    sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println);
}

@solution John McClean (Streamable)

public void sieveTest2(){
    sieve(Streamable.range(2, 39990)).forEach(System.out::println);
}

L'augmentation de la limite supérieure au-dessus de 39990 entraîne une erreur StackOverflowError.

@solution frhack(LazySeq)

LazySeq<Integer> ints = integers(2);
LazySeq primes = sieve(ints); // sieve method from @frhack answer
primes.forEach(p -> System.out.println(p));

Résultat: bloqué après un nombre premier = 53327 avec une énorme allocation de tas et une collecte de déchets prenant plus de 90%. Il a fallu plusieurs minutes pour avancer de 53323 à 53327, donc attendre plus semble peu pratique.

@solution vidi

Prime.stream().forEach(System.out::println);

Résultat: StackOverflowError après le nombre premier = 134417.

Ma solution (StreamEx)

sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println);

Résultat: StackOverflowError après le nombre premier = 236167.

@solution frhack(rxjava)

Observable<Integer> primes = Observable.from(()->primesStream.iterator());
primes.forEach((x) -> System.out.println(x.toString()));            

Résultat: StackOverflowError après le nombre premier = 367663.

@solution Holger

IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0));
primes.forEach(System.out::println);

Résultat: StackOverflowError après nombre premier = 368089.

Ma solution (StreamEx avec concaténation de prédicat)

sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println);

Résultat: StackOverflowError après le nombre premier = 368287.


Donc, trois solutions impliquant la concaténation des prédicats gagnent, car chaque nouvelle condition ajoute seulement 2 trames de pile supplémentaires. Je pense que la différence entre eux est marginale et ne devrait pas être considérée pour définir un gagnant. Cependant, j'aime plus ma première solution StreamEx car elle ressemble plus au code Scala.

2
répondu Tagir Valeev 2016-10-04 03:40:53

Si cela ne vous dérange pas d'utiliser des bibliothèques tierces cyclops-streams, la bibliothèque que j'ai écrite a un certain nombre de solutions potentielles.

La classe StreamUtils a un grand nombre de méthodes statiques pour travailler directement avec java.util.stream.Streams y compris headAndTail.

HeadAndTail<Integer> headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4));
int head = headAndTail.head(); //1
Stream<Integer> tail = headAndTail.tail(); //Stream[2,3,4]

La classeStreamable représente un Stream rejouable et fonctionne en construisant une structure de données Intermédiaire paresseuse et en cache. Parce que c'est la mise en cache et remboursable-head et tail peuvent être mis en œuvre directement et séparément.

Streamable<Integer> replayable=  Streamable.fromStream(Stream.of(1,2,3,4));
int head = repayable.head(); //1
Stream<Integer> tail = replayable.tail(); //Stream[2,3,4]

Cyclops-streams fournit également une extension séquentielle Stream qui s'étend à son tour jOOλ et possède à la fois des solutions Tuple basées (à partir de jOOλ) et des objets de domaine (HeadAndTail) pour l'extraction de la tête et de la queue.

SequenceM.of(1,2,3,4)
         .splitAtHead(); //Tuple[1,SequenceM[2,3,4]

SequenceM.of(1,2,3,4)
         .headAndTail();

Update per Tagir's request- > une version Java du tamis Scala utilisant SequenceM

public void sieveTest(){
    sieve(SequenceM.range(2, 1_000)).forEach(System.out::println);
}

SequenceM<Integer> sieve(SequenceM<Integer> s){

    return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head())
                            .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(SequenceM.of());
}

Et une autre version via Streamable

public void sieveTest2(){
    sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
}

Streamable<Integer> sieve(Streamable<Integer> s){

    return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                                    .appendStreamable(sieve(s.tail()
                                                                    .filter(n -> n % s.head() != 0)));
}

Note - ni Streamable de SequenceM n'ont une implémentation vide - d'où le vérifier la taille pour Streamable et l'utilisation de headAndTailOptional.

Enfin une version utilisant plain java.util.stream.Stream

import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional;

public void sieveTest(){
    sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
}

Stream<Integer> sieve(Stream<Integer> s){

    return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                            ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(Stream.of());
}

Une autre mise à jour-une itération paresseuse basée sur la version de @Holger utilisant des objets plutôt que des primitives (notez qu'une version primitive est également possible)

  final Mutable<Predicate<Integer>> predicate = Mutable.of(x->true);
  SequenceM.iterate(2, n->n+1)
           .filter(i->predicate.get().test(i))
           .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0)))
           .limit(100000)
           .forEach(System.out::println);
1
répondu John McClean 2016-01-19 14:11:34

Pour obtenir head and tail, vous avez besoin d'une implémentation de flux paresseux. Java 8 stream ou RxJava ne conviennent pas.

Vous pouvez utiliser, par exemple, LazySeq comme suit.

La séquence paresseuse est toujours parcourue depuis le début en utilisant Très bon marché décomposition en premier / repos (tête () et queue ())

LazySeq implémente java.util.Liste interface, peut donc être utilisé dans variété d'endroits. De plus il implémente également des améliorations Java 8 à collections, à savoir les flux et collectionneurs


package com.company;

import com.nurkiewicz.lazyseq.LazySeq;

public class Main {

    public static void main(String[] args) {

        LazySeq<Integer> ints = integers(2);
        LazySeq primes = sieve(ints);
        primes.take(10).forEach(p -> System.out.println(p));

    }

    private static LazySeq<Integer> sieve(LazySeq<Integer> s) {
        return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0)));
    }

    private static LazySeq<Integer> integers(int from) {
        return LazySeq.cons(from, () -> integers(from + 1));
    }

}
0
répondu frhack 2015-06-06 11:08:03

Voici une autre recette utilisant la manière suggérée par Holger. Il utilise RxJava juste pour ajouter la possibilité d'utiliser la méthode take(int) et bien d'autres.

package com.company;

import rx.Observable;

import java.util.function.IntPredicate;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {

        final IntPredicate[] p={(x)->true};
        IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0)   );

        Observable primes = Observable.from(()->primesStream.iterator());

        primes.take(10).forEach((x) -> System.out.println(x.toString()));


    }

}
0
répondu frhack 2015-06-06 11:11:55

Si vous voulez obtenir la tête d'un ruisseau, tout:

IntStream.range(1, 5).first();

Si vous voulez obtenir la queue d'un flux, juste:

IntStream.range(1, 5).skip(1);

Si vous voulez obtenir à la fois la tête et la queue d'un ruisseau, tout:

IntStream s = IntStream.range(1, 5);
int head = s.head();
IntStream tail = s.tail();

Si vous voulez trouver le premier, juste:

LongStream.range(2, n)
   .filter(i -> LongStream.range(2, (long) Math.sqrt(i) + 1).noneMatch(j -> i % j == 0))
   .forEach(N::println);

Si vous voulez en savoir plus, allez chercher AbacusUtil

Déclaration: je suis le développeur de AbacusUtil.

-1
répondu user_3380739 2017-01-07 21:09:07