Comment attendre la fin de tous les threads, en utilisant ExecutorService?

j'ai besoin d'exécuter un certain nombre de tâches 4 à la fois, quelque chose comme ceci:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Comment puis-je être avisé une fois qu'ils sont tous terminés? Pour l'instant je ne peux pas penser à quelque chose de mieux que de configurer un compteur de tâches global et de le diminuer à la fin de chaque tâche, puis surveiller en boucle infinie ce compteur pour devenir 0; ou obtenir une liste de futurs et en boucle infinie moniteur isDone pour chacun d'eux. Quelles sont les meilleures solutions qui n'impliquent pas des boucles infinies?

Merci.

318
demandé sur bluish 2009-08-09 08:39:28

22 réponses

essentiellement sur un ExecutorService vous appelez shutdown() et puis awaitTermination() :

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
371
répondu cletus 2016-09-08 06:43:58

utiliser un CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

et à l'intérieur de votre tâche (joindre en try / finally)

latch.countDown();
154
répondu ChssPly76 2016-09-08 06:45:00

ExecutorService.invokeAll() il le fait pour toi.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
77
répondu sjlee 2016-09-08 06:45:54

vous pouvez utiliser des listes de Futures, ainsi:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

puis quand vous voulez joindre sur chacun d'eux, c'est essentiellement l'équivalent de joindre sur chacun, (avec l'avantage supplémentaire qu'il soulève des exceptions de fils d'enfant à la principale):

for(Future f: this.futures) { f.get(); }

le truc, C'est D'appeler .get() sur chaque futur un à la fois, au lieu d'une boucle infinie appelant isDone () sur (tous ou chacun). Donc vous êtes sûr de "passer à autre chose" et passé ce bloc dès que le dernier fil est terminé. La mise en garde est que depuis le .get () call relance les exceptions, si l'un des threads meurt, vous relancerez à partir de cela peut-être avant que les autres threads n'aient terminé [pour éviter cela, vous pouvez ajouter un catch ExecutionException autour de l'appel get]. L'autre mise en garde est qu'il conserve une référence à tous les threads de sorte que s'ils ont des variables locales de thread, ils ne seront pas collectés avant que vous ne passiez ce bloc (bien que vous pourriez être en mesure de vous déplacer cela, si elle est devenue un problème, en enlevant le futur de L'ArrayList). Si vous voulez savoir quel futur "finit en premier" vous pouvez utiliser quelque chose comme https://stackoverflow.com/a/31885029/32453

41
répondu rogerdpack 2017-05-23 11:55:03

juste mes deux cents. Pour surmonter l'exigence de CountDownLatch pour connaître le nombre de tâches à l'avance, vous pouvez le faire à l'ancienne façon en utilisant un simple Semaphore .

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

dans votre tâche appelez simplement s.release() comme vous le feriez latch.countDown();

23
répondu stryba 2012-01-19 10:48:28

en Java8 vous pouvez le faire avec CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
19
répondu AdamSkywalker 2017-04-29 08:41:47

la classe CyclicBarrier en Java 5 et plus est conçu pour ce genre de chose.

12
répondu Pekka Enberg 2016-09-08 06:46:38

un peu en retard au jeu mais pour le plaisir de l'achèvement...

au lieu d'attendre que toutes les tâches soient terminées, vous pouvez penser en termes de principe D'Hollywood, "ne m'appelez pas, je vous appellerai" - quand j'aurai fini. Je pense que le code résultant est plus élégant...

la goyave offre des outils intéressants pour accomplir ceci.

un exemple::

Envelopper un ExecutorService dans un ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Soumettre une collection de callables pour l'exécution ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

maintenant la partie essentielle:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

joindre un callback à L'avenir Listable, que vous pouvez utiliser pour être notifié lorsque tous les futures remplir::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

cela offre également l'avantage que vous pouvez recueillir tous les résultats en un seul endroit une fois le traitement est terminé...

plus d'information ici

10
répondu Răzvan Petruescu 2014-11-07 12:19:15

vous pouvez envelopper vos tâches dans un autre runnable, qui enverra des notifications:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
5
répondu Zed 2009-08-09 04:46:45

je viens d'écrire un programme qui résout votre problème. Il n'y a pas eu de mise en œuvre concise donnée, donc je vais en ajouter une. Bien que vous puissiez utiliser executor.shutdown() et executor.awaitTermination() , ce n'est pas la meilleure pratique car le temps pris par les différents threads serait imprévisible.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
3
répondu Kiran 2012-12-30 19:15:36

juste pour fournir plus de solutions de rechange ici différent d'utiliser la serrure/barrières. Vous pouvez également obtenir les résultats partiels jusqu'à ce que chacun d'eux finissent en utilisant CompletionService .

D'après Java Concourency in practice: "Si vous avez un lot de calculs à soumettre à un exécuteur et vous voulez récupérer leurs résultats comme ils deviennent disponible, vous pouvez conserver le futur associé à chaque tâche et polling répété pour l'achèvement en appelant get avec un délai d'attente de zéro. C'est possible, mais fastidieux . Heureusement, il existe un better way : un service d'achèvement."

Ici, la mise en œuvre

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
3
répondu Alberto Gurrion 2016-03-04 17:51:40

vous pouvez utiliser votre propre sous-classe de Executorcomplettionservice pour envelopper taskExecutor , et votre propre mise en œuvre de BlockingQueue pour être informé lorsque chaque tâche complète et effectuer n'importe quel rappel ou autre action que vous désirez lorsque le nombre de tâches terminées atteint votre but désiré.

1
répondu Alex Martelli 2014-09-19 17:53:07

vous devez utiliser la méthode executorService.shutdown() et executorService.awaitTermination .

exemple:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
1
répondu Rollen Holt 2016-06-21 05:41:18

Java 8 - nous pouvons utiliser l'API stream pour traiter stream. S'il vous plaît voir l'extrait ci-dessous

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
1
répondu Vlad 2016-07-19 23:39:29

vous pouvez utiliser ce code:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
1
répondu Tuan Pham 2017-05-23 06:15:40

C'est ma solution, basée dans "AdamSkywalker" astuce, et ça marche

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
1
répondu frss-soft.com 2018-05-23 15:26:45

donc je poste ma réponse de question liée ici, incase quelqu'un veulent une façon plus simple de le faire

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
0
répondu Mạnh Quyết Nguyễn 2018-05-07 18:49:09

voici deux options, juste un peu confondre lequel est le meilleur à aller.

Option 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Option 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

ici putting future.get(); try catch est une bonne idée de droite?

0
répondu user2862544 2018-09-02 16:14:15

Cela pourrait

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
-1
répondu Amol Desai 2015-10-22 06:03:57

vous pouvez appeler waitTillDone () sur ce Runner classe:

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

vous pouvez réutiliser cette classe et appeler waitTillDone() autant de fois que vous le voulez avant d'appeler shutdown(), plus votre code est extrêmement simple . Aussi, vous ne pas savoir le nombre de tâches d'avance.

pour l'utiliser il suffit d'ajouter cette dépendance Grad/maven compile 'com.github.matejtymes:javafixes:1.1.1' à votre projet.

plus de détails peuvent être trouvés ici:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

-1
répondu Matej Tymes 2016-11-30 09:56:33

il y a une méthode dans executor getActiveCount() - qui donne le nombre de threads actifs.

après avoir enjambé le fil, nous pouvons vérifier si la valeur activeCount() est 0 . Une fois que la valeur est zéro, cela signifie qu'il n'y a pas de threads actifs actuellement en cours d'exécution, ce qui signifie que la tâche est terminée:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
-2
répondu user 2014-10-31 09:57:21