Comment utiliser invokeAll() pour laisser tous les thread pool faire leur tâche?
ExecutorService pool=Executors.newFixedThreadPool(7);
List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();
for(int i=0;i<=diff;i++){
String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);
callList.add(new HotelCheapestFare(str));
}
future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){
System.out.println("name is:"+future.get(i).get().getName());
}
maintenant je veux pool à invokeAll
toute la tâche avant d'obtenir à la boucle pour, mais quand j'exécute ce programme pour boucle est exécuté avant que invokeAll
et lance cette exception:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:65)
Caused by: java.lang.NullPointerException at
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run
3 réponses
la façon dont fonctionne un ExecutorService
est que lorsque vous appelez invokeAll
il attend que toutes les tâches à accomplir:
exécute les tâches données, en retournant une liste des futurs détenant leurs statut et résultats une fois tous terminés. Avenir.isDone() est vrai pour chaque élément de la liste retournée. notez qu'une tâche accomplie pourrait avoir terminé soit normalement, soit en lançant une exception . Les résultats de cette méthode sont Non défini si la collection donnée est modifiée tandis que cette opération est en cours. 1 (italique ajouté)
ce que cela signifie est que vos tâches sont toutes accomplies, mais certains peuvent avoir jeté une exception. Cette exception fait partie de la Future
- l'appel get
fait que l'exception est de nouveau enveloppée dans un ExecutionException
.
De vous stacktrack
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
^^^ <-- from get
Vous pouvez voir que c'est effectivement le cas. Une de vos tâches a échoué avec un NPE. Le ExecutorService
a pris l'exception et vous en parle en lançant un ExecutionException
quand vous appelez Future.get
.
maintenant, si vous voulez prendre des tâches comme ils complètent vous avez besoin d'un ExecutorCompletionService
. Cela agit comme un BlockingQueue
qui vous permettra de sonder pour les tâches au fur et à mesure qu'elles se terminent.
public static void main(String[] args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
final Future<String> myValue = completionService.take();
//do stuff with the Future
final String result = myValue.get();
System.out.println(result);
} catch (InterruptedException ex) {
return;
} catch (ExecutionException ex) {
System.err.println("TASK FAILED");
}
}
}
});
for (int i = 0; i < 100; ++i) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if (Math.random() > 0.5) {
throw new RuntimeException("FAILED");
}
return "SUCCESS";
}
});
}
executorService.shutdown();
}
In cet exemple j'ai une tâche qui appelle take
sur le ExecutorCompletionService
qui obtient le Future
s comme ils deviennent disponibles et puis je soumets des tâches au ExecutorCompletionService
.
cela vous permettra d'obtenir la tâche échouée dès qu'elle échoue plutôt que d'avoir à attendre toutes les tâches pour échouer ou réussir ensemble.
la seule complication est qu'il est difficile de dire au fil du sondage que toutes les tâches sont faites comme tout est maintenant asynchrone. Dans ce cas, j'ai utilisé la connaissance que 100 tâches seront soumises de sorte qu'il n'a besoin de sonder 100 fois. Une façon plus générale serait de collecter le Future
s de la méthode submit
ainsi et puis boucle sur eux pour voir si tout est terminé.
futur.get () lance les exceptions ci-dessous.
CancellationException
- si le calcul a été annulé
ExecutionException
- si le calcul fait exception
InterruptedException
- si le fil de courant a été interrompu pendant l'attente
saisissez toutes ces exceptions lorsque vous appelez la méthode get()
.
j'ai simulé diviser par zéro exception pour certaines tâches Callable
mais exception dans une tâche Callable
n'affecte pas les autres tâches Callable
soumises à ExecutorService
si vous attrapez plus de trois exceptions comme indiqué dans l'exemple de code.
exemple de code snippet:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllUsage{
public InvokeAllUsage(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(10);
List<MyCallable> futureList = new ArrayList<MyCallable>();
for ( int i=0; i<10; i++){
MyCallable myCallable = new MyCallable((long)i+1);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
for(Future<Long> future : futures){
try{
System.out.println("future.isDone = " + future.isDone());
System.out.println("future: call ="+future.get());
}
catch (CancellationException ce) {
ce.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
}catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllUsage demo = new InvokeAllUsage();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
if ( id % 5 == 0){
id = id / 0;
}
return id;
}
}
}
sortie:
creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Completed
invokeAll est une méthode de blocage. Cela signifie que – JVM ne passera pas à la ligne suivante tant que tous les threads ne seront pas terminés. Donc je pense qu'il y a quelque chose qui ne va pas avec le résultat de votre thread future.
System.out.println("name is:"+future.get(i).get().getName());
de cette ligne, je pense qu'il y a des contrats à terme qui n'ont pas de résultat et qui peuvent être nuls, donc vous devriez vérifier votre code,s'il y a des contrats à terme nuls, si c'est le cas, obtenez un si avant que cette ligne ne soit exécutée.