API de flux et Files D'attente: abonnez-vous à BlockingQueue stream-style

Disons que nous avons une file d'attente

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

Et un autre thread y met des valeurs, puis nous le lisons comme

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

Comment puis-je parcourir cette file d'attente dans le style stream, tout en conservant une sémantique similaire au code ci-dessus.

Ce code ne traverse que l'état actuel de la file d'attente:

queue.stream().forEach(e -> System.out.println(e));
21
demandé sur Mikhail Boyarsky 2014-05-05 02:16:33

3 réponses

Je devine un peu ce que vous attendez, mais je pense que j'ai une bonne intuition.

Le flot d'une file d'attente, comme une itération sur une file d'attente, représente le contenu actuel de la file d'attente. Lorsque l'itérateur ou le flux atteint la queue de la file d'attente, il ne bloque pas en attendant d'autres éléments à ajouter. L'itérateur ou le flux est épuisé à ce stade et le calcul se termine.

Si vous voulez un flux qui se compose de tous les éléments actuels et futurs la file d'attente, vous pouvez faire quelque chose comme ceci:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(malheureusement, la nécessité de gérer InterruptedException rend cela assez désordonné.)

Notez qu'il n'existe aucun moyen de fermer une file d'attente, et il n'existe aucun moyen pour Stream.generate pour arrêter de générer des éléments, donc c'est effectivement un courant infini. La seule façon de le terminer est avec une opération de flux de court-circuit telle que findFirst.

29
répondu Stuart Marks 2014-05-05 03:10:24

Vous pouvez regarder une implémentation de file d'attente asynchrone. Si vous avez Java 8, puis cyclope-réagir, j'ai un développeur sur ce projet, fournit un async.File d'attente qui vous permettra à la fois de remplir et de consommer de la file d'attente de manière asynchrone (et proprement).

Par exemple

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

, Ou simplement (tant que c'est un com.AOL.simple.réagir.asynchrone.File d'attente)

Queue<String> queue = new Queue<>();

, Puis dans un thread séparé :

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

Retour sur votre thread principal, votre code d'origine devrait maintenant fonctionner comme prévu (itérer à l'infini sur les messages ajoutés à la file d'attente et les imprimer)

queue.stream().forEach(e -> System.out.println(e));

La file d'attente et donc le flux peuvent être fermés à n'importe quel stade via -

queue.close();
14
répondu John McClean 2016-02-22 21:27:05

Une autre approche consiste à construire un séparateur personnalisé. Dans mon cas, j'ai une file d'attente de blocage, et je veux construire un flux qui continue à extraire des éléments, jusqu'à ce que le bloc expire. Le spliterator est quelque chose comme:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

L'exception levée pour gérer InterruptedException est une extension de RuntimeException. En utilisant cette classe, on peut construire un flux via: StreamSupport.flux (nouveau QueueSpliterator(...)) et ajoutez les opérations de flux habituelles.

11
répondu user3787629 2015-12-02 18:58:04