API de flux et Files D'attente: abonnez-vous à BlockingQueue stream-style
Disons que nous avons une file d'attente
BlockingQueue<String> queue= new LinkedBlockingQueue<>();
Et un autre thread y met des valeurs, puis nous le lisons comme
while (true) {
String next = queue.take();
System.out.println("next message:" + next);
}
Comment puis-je parcourir cette file d'attente dans le style stream, tout en conservant une sémantique similaire au code ci-dessus.
Ce code ne traverse que l'état actuel de la file d'attente:
queue.stream().forEach(e -> System.out.println(e));
3 réponses
Je devine un peu ce que vous attendez, mais je pense que j'ai une bonne intuition.
Le flot d'une file d'attente, comme une itération sur une file d'attente, représente le contenu actuel de la file d'attente. Lorsque l'itérateur ou le flux atteint la queue de la file d'attente, il ne bloque pas en attendant d'autres éléments à ajouter. L'itérateur ou le flux est épuisé à ce stade et le calcul se termine.
Si vous voulez un flux qui se compose de tous les éléments actuels et futurs la file d'attente, vous pouvez faire quelque chose comme ceci:
Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException ie) {
return "Interrupted!";
}
})
.filter(s -> s.endsWith("x"))
.forEach(System.out::println);
(malheureusement, la nécessité de gérer InterruptedException
rend cela assez désordonné.)
Notez qu'il n'existe aucun moyen de fermer une file d'attente, et il n'existe aucun moyen pour Stream.generate
pour arrêter de générer des éléments, donc c'est effectivement un courant infini. La seule façon de le terminer est avec une opération de flux de court-circuit telle que findFirst
.
Vous pouvez regarder une implémentation de file d'attente asynchrone. Si vous avez Java 8, puis cyclope-réagir, j'ai un développeur sur ce projet, fournit un async.File d'attente qui vous permettra à la fois de remplir et de consommer de la file d'attente de manière asynchrone (et proprement).
Par exemple
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
, Ou simplement (tant que c'est un com.AOL.simple.réagir.asynchrone.File d'attente)
Queue<String> queue = new Queue<>();
, Puis dans un thread séparé :
new Thread(() -> {
while (true) {
queue.add("New message " + System.currentTimeMillis());
}
}).start();
Retour sur votre thread principal, votre code d'origine devrait maintenant fonctionner comme prévu (itérer à l'infini sur les messages ajoutés à la file d'attente et les imprimer)
queue.stream().forEach(e -> System.out.println(e));
La file d'attente et donc le flux peuvent être fermés à n'importe quel stade via -
queue.close();
Une autre approche consiste à construire un séparateur personnalisé. Dans mon cas, j'ai une file d'attente de blocage, et je veux construire un flux qui continue à extraire des éléments, jusqu'à ce que le bloc expire. Le spliterator est quelque chose comme:
public class QueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;
private final long timeoutMs;
public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
this.queue = queue;
this.timeoutMs = timeoutMs;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
if (next == null) {
return false;
}
action.accept(next);
return true;
} catch (final InterruptedException e) {
throw new SupplierErrorException("interrupted", e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
L'exception levée pour gérer InterruptedException est une extension de RuntimeException. En utilisant cette classe, on peut construire un flux via: StreamSupport.flux (nouveau QueueSpliterator(...)) et ajoutez les opérations de flux habituelles.