Comment attendre la fin de tous les threads, en utilisant ExecutorService?
j'ai besoin d'exécuter un certain nombre de tâches 4 à la fois, quelque chose comme ceci:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
Comment puis-je être avisé une fois qu'ils sont tous terminés? Pour l'instant je ne peux pas penser à quelque chose de mieux que de configurer un compteur de tâches global et de le diminuer à la fin de chaque tâche, puis surveiller en boucle infinie ce compteur pour devenir 0; ou obtenir une liste de futurs et en boucle infinie moniteur isDone pour chacun d'eux. Quelles sont les meilleures solutions qui n'impliquent pas des boucles infinies?
Merci.
22 réponses
essentiellement sur un ExecutorService
vous appelez shutdown()
et puis awaitTermination()
:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
utiliser un CountDownLatch :
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
et à l'intérieur de votre tâche (joindre en try / finally)
latch.countDown();
ExecutorService.invokeAll()
il le fait pour toi.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
vous pouvez utiliser des listes de Futures, ainsi:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
puis quand vous voulez joindre sur chacun d'eux, c'est essentiellement l'équivalent de joindre sur chacun, (avec l'avantage supplémentaire qu'il soulève des exceptions de fils d'enfant à la principale):
for(Future f: this.futures) { f.get(); }
le truc, C'est D'appeler .get() sur chaque futur un à la fois, au lieu d'une boucle infinie appelant isDone () sur (tous ou chacun). Donc vous êtes sûr de "passer à autre chose" et passé ce bloc dès que le dernier fil est terminé. La mise en garde est que depuis le .get () call relance les exceptions, si l'un des threads meurt, vous relancerez à partir de cela peut-être avant que les autres threads n'aient terminé [pour éviter cela, vous pouvez ajouter un catch ExecutionException
autour de l'appel get]. L'autre mise en garde est qu'il conserve une référence à tous les threads de sorte que s'ils ont des variables locales de thread, ils ne seront pas collectés avant que vous ne passiez ce bloc (bien que vous pourriez être en mesure de vous déplacer cela, si elle est devenue un problème, en enlevant le futur de L'ArrayList). Si vous voulez savoir quel futur "finit en premier" vous pouvez utiliser quelque chose comme https://stackoverflow.com/a/31885029/32453
juste mes deux cents.
Pour surmonter l'exigence de CountDownLatch
pour connaître le nombre de tâches à l'avance, vous pouvez le faire à l'ancienne façon en utilisant un simple Semaphore
.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
dans votre tâche appelez simplement s.release()
comme vous le feriez latch.countDown();
en Java8 vous pouvez le faire avec CompletableFuture :
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
la classe CyclicBarrier en Java 5 et plus est conçu pour ce genre de chose.
un peu en retard au jeu mais pour le plaisir de l'achèvement...
au lieu d'attendre que toutes les tâches soient terminées, vous pouvez penser en termes de principe D'Hollywood, "ne m'appelez pas, je vous appellerai" - quand j'aurai fini. Je pense que le code résultant est plus élégant...
la goyave offre des outils intéressants pour accomplir ceci.
un exemple::
Envelopper un ExecutorService dans un ListeningExecutorService:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Soumettre une collection de callables pour l'exécution ::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
maintenant la partie essentielle:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
joindre un callback à L'avenir Listable, que vous pouvez utiliser pour être notifié lorsque tous les futures remplir::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
cela offre également l'avantage que vous pouvez recueillir tous les résultats en un seul endroit une fois le traitement est terminé...
plus d'information ici
suivre l'une des approches ci-dessous.
- Itérer sur tous les Avenir "1519120920 des" tâches", de retour d'
submit
surExecutorService
et de vérifier le statut à l'appel de blocageget()
surFuture
objet tel que suggéré parKiran
- Utiliser
invokeAll()
sur ExecutorService - CountDownLatch
- ForkJoinPool ou Exécuteurs testamentaires.html#newWorkStealingPool
- Utiliser
shutdown, awaitTermination, shutdownNow
Api de ThreadPoolExecutor dans le bon ordre
Liées SE questions:
comment CountDownLatch est-il utilisé en Java Multithreading?
vous pouvez envelopper vos tâches dans un autre runnable, qui enverra des notifications:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
je viens d'écrire un programme qui résout votre problème. Il n'y a pas eu de mise en œuvre concise donnée, donc je vais en ajouter une. Bien que vous puissiez utiliser executor.shutdown()
et executor.awaitTermination()
, ce n'est pas la meilleure pratique car le temps pris par les différents threads serait imprévisible.
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
juste pour fournir plus de solutions de rechange ici différent d'utiliser la serrure/barrières. Vous pouvez également obtenir les résultats partiels jusqu'à ce que chacun d'eux finissent en utilisant CompletionService .
D'après Java Concourency in practice: "Si vous avez un lot de calculs à soumettre à un exécuteur et vous voulez récupérer leurs résultats comme ils deviennent disponible, vous pouvez conserver le futur associé à chaque tâche et polling répété pour l'achèvement en appelant get avec un délai d'attente de zéro. C'est possible, mais fastidieux . Heureusement, il existe un better way : un service d'achèvement."
Ici, la mise en œuvre
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
vous pouvez utiliser votre propre sous-classe de Executorcomplettionservice pour envelopper taskExecutor
, et votre propre mise en œuvre de BlockingQueue pour être informé lorsque chaque tâche complète et effectuer n'importe quel rappel ou autre action que vous désirez lorsque le nombre de tâches terminées atteint votre but désiré.
vous devez utiliser la méthode executorService.shutdown()
et executorService.awaitTermination
.
exemple:
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
Java 8 - nous pouvons utiliser l'API stream pour traiter stream. S'il vous plaît voir l'extrait ci-dessous
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
vous pouvez utiliser ce code:
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
C'est ma solution, basée dans "AdamSkywalker" astuce, et ça marche
package frss.main;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestHilos {
void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
System.out.println("FIN DEL PROCESO DE HILOS");
}
private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();
Hilo01 task1 = new Hilo01();
tasks.add(task1);
Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}
private class Hilo01 extends Thread {
@Override
public void run() {
System.out.println("HILO 1");
}
}
private class Hilo02 extends Thread {
@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}
}
public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}
donc je poste ma réponse de question liée ici, incase quelqu'un veulent une façon plus simple de le faire
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] = CompletableFuture.runAsync(runner, executor);
}
CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
voici deux options, juste un peu confondre lequel est le meilleur à aller.
Option 1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
Option 2:
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
ici putting future.get(); try catch est une bonne idée de droite?
Cela pourrait
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
vous pouvez appeler waitTillDone () sur ce Runner classe:
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
vous pouvez réutiliser cette classe et appeler waitTillDone() autant de fois que vous le voulez avant d'appeler shutdown(), plus votre code est extrêmement simple . Aussi, vous ne pas savoir le nombre de tâches d'avance.
pour l'utiliser il suffit d'ajouter cette dépendance Grad/maven compile 'com.github.matejtymes:javafixes:1.1.1'
à votre projet.
plus de détails peuvent être trouvés ici:
https://github.com/MatejTymes/JavaFixes
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
il y a une méthode dans executor getActiveCount()
- qui donne le nombre de threads actifs.
après avoir enjambé le fil, nous pouvons vérifier si la valeur activeCount()
est 0
. Une fois que la valeur est zéro, cela signifie qu'il n'y a pas de threads actifs actuellement en cours d'exécution, ce qui signifie que la tâche est terminée:
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}