Comment utiliser invokeAll() pour laisser tous les thread pool faire leur tâche?

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

maintenant je veux pool à invokeAll toute la tâche avant d'obtenir à la boucle pour, mais quand j'exécute ce programme pour boucle est exécuté avant que invokeAll et lance cette exception:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run
16
demandé sur Ravindra babu 2013-08-13 10:46:50

3 réponses

la façon dont fonctionne un ExecutorService est que lorsque vous appelez invokeAll il attend que toutes les tâches à accomplir:

exécute les tâches données, en retournant une liste des futurs détenant leurs statut et résultats une fois tous terminés. Avenir.isDone() est vrai pour chaque élément de la liste retournée. notez qu'une tâche accomplie pourrait avoir terminé soit normalement, soit en lançant une exception . Les résultats de cette méthode sont Non défini si la collection donnée est modifiée tandis que cette opération est en cours. 1 (italique ajouté)

ce que cela signifie est que vos tâches sont toutes accomplies, mais certains peuvent avoir jeté une exception. Cette exception fait partie de la Future - l'appel get fait que l'exception est de nouveau enveloppée dans un ExecutionException .

De vous stacktrack

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

Vous pouvez voir que c'est effectivement le cas. Une de vos tâches a échoué avec un NPE. Le ExecutorService a pris l'exception et vous en parle en lançant un ExecutionException quand vous appelez Future.get .

maintenant, si vous voulez prendre des tâches comme ils complètent vous avez besoin d'un ExecutorCompletionService . Cela agit comme un BlockingQueue qui vous permettra de sonder pour les tâches au fur et à mesure qu'elles se terminent.

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

In cet exemple j'ai une tâche qui appelle take sur le ExecutorCompletionService qui obtient le Future s comme ils deviennent disponibles et puis je soumets des tâches au ExecutorCompletionService .

cela vous permettra d'obtenir la tâche échouée dès qu'elle échoue plutôt que d'avoir à attendre toutes les tâches pour échouer ou réussir ensemble.

la seule complication est qu'il est difficile de dire au fil du sondage que toutes les tâches sont faites comme tout est maintenant asynchrone. Dans ce cas, j'ai utilisé la connaissance que 100 tâches seront soumises de sorte qu'il n'a besoin de sonder 100 fois. Une façon plus générale serait de collecter le Future s de la méthode submit ainsi et puis boucle sur eux pour voir si tout est terminé.

15
répondu Boris the Spider 2013-08-13 12:42:35

futur.get () lance les exceptions ci-dessous.

CancellationException - si le calcul a été annulé

ExecutionException - si le calcul fait exception

InterruptedException - si le fil de courant a été interrompu pendant l'attente

saisissez toutes ces exceptions lorsque vous appelez la méthode get() .

j'ai simulé diviser par zéro exception pour certaines tâches Callable mais exception dans une tâche Callable n'affecte pas les autres tâches Callable soumises à ExecutorService si vous attrapez plus de trois exceptions comme indiqué dans l'exemple de code.

exemple de code snippet:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllUsage{
    public InvokeAllUsage(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(10);

        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for ( int i=0; i<10; i++){
            MyCallable myCallable = new MyCallable((long)i+1);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
            for(Future<Long> future : futures){
                try{
                    System.out.println("future.isDone = " + future.isDone());
                    System.out.println("future: call ="+future.get());
                }
                catch (CancellationException ce) {
                    ce.printStackTrace();
                } catch (ExecutionException ee) {
                    ee.printStackTrace();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllUsage demo = new InvokeAllUsage();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){

            if ( id % 5 == 0){
                id = id / 0;
            }           
            return id;
        }
    }
}

sortie:

creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Completed
4
répondu Ravindra babu 2016-09-17 13:55:29

invokeAll est une méthode de blocage. Cela signifie que – JVM ne passera pas à la ligne suivante tant que tous les threads ne seront pas terminés. Donc je pense qu'il y a quelque chose qui ne va pas avec le résultat de votre thread future.

System.out.println("name is:"+future.get(i).get().getName());

de cette ligne, je pense qu'il y a des contrats à terme qui n'ont pas de résultat et qui peuvent être nuls, donc vous devriez vérifier votre code,s'il y a des contrats à terme nuls, si c'est le cas, obtenez un si avant que cette ligne ne soit exécutée.

-2
répondu Winston 2013-08-13 08:38:13