Java ExecutorService: attaittermination of all recursively created tasks
j'utilise un ExecutorService
pour exécuter une tâche. Cette tâche peut créer récursivement d'autres tâches qui sont soumises à la même ExecutorService
et ces tâches d'enfant peuvent le faire, aussi.
j'ai maintenant le problème que je veux attendre jusqu'à ce que toutes les tâches sont effectuées (qui est, toutes les tâches sont terminées et ils n'ont pas à soumettre de nouvelles) avant de continuer.
Je ne peux pas appeler ExecutorService.shutdown()
dans le fil principal parce que cela empêche de nouvelles tâches d'être accepté par le ExecutorService
.
et appeler ExecutorService.awaitTermination()
semble ne rien faire si shutdown
n'a pas été appelé.
donc je suis un peu coincé ici. Ça ne doit pas être si dur pour le ExecutorService
de voir que tous les ouvriers sont inactifs, n'est-ce pas? La seule solution inélégante que j'ai pu trouver est d'utiliser directement un ThreadPoolExecutor
et d'interroger son getPoolSize()
de temps en temps. Il n'y a pas de meilleur moyen de faire ça?
10 réponses
si le nombre de tâches dans l'arbre des tâches récursives est initialement inconnu, peut-être que la façon la plus simple serait d'implémenter votre propre primitive de synchronisation, une sorte de" sémaphore inverse", et de la partager parmi vos tâches. Avant de soumettre chaque tâche vous incrémentez une valeur, quand la tâche est terminée, elle décrémente cette valeur, et vous attendez jusqu'à ce que la valeur soit 0.
L'implémenter comme une primitive séparée explicitement appelée de tâches découple cette logique de la l'implémentation du pool de threads et vous permet de soumettre plusieurs arbres indépendants de tâches récursives dans le même pool.
quelque chose comme ça:
public class InverseSemaphore {
private int value = 0;
private Object lock = new Object();
public void beforeSubmit() {
synchronized(lock) {
value++;
}
}
public void taskCompleted() {
synchronized(lock) {
value--;
if (value == 0) lock.notifyAll();
}
}
public void awaitCompletion() throws InterruptedException {
synchronized(lock) {
while (value > 0) lock.wait();
}
}
}
notez que taskCompleted()
doit être appelé à l'intérieur d'un bloc finally
, pour le rendre insensible à d'éventuelles exceptions.
notez aussi que beforeSubmit()
doit être appelé par le thread de soumission avant que la tâche soit soumise, et non par la tâche elle-même, pour éviter d'éventuels "faux" achèvement" lorsque les anciennes tâches sont terminées et que les nouvelles ne sont pas encore commencées.
EDIT: problème Important avec l'utilisation de modèle fixe.
C'est vraiment un candidat idéal pour un Phaser. Java 7 sort avec cette nouvelle classe. C'est un CountdonwLatch/CyclicBarrier flexible. Vous pouvez obtenir une version stable à site D'intérêt JSR 166 .
la façon dont il est un CountdownLatch plus flexible / CyclicBarrier est parce qu'il est capable non seulement de soutenir un nombre inconnu de parties (threads), mais son également réutilisable (thats où la partie de phase vient en)
pour chaque tâche de vous présenter vous inscrire, lorsque la tâche est terminée, vous arrivez. Cela peut être fait de façon récursive.
Phaser phaser = new Phaser();
ExecutorService e = //
Runnable recursiveRunnable = new Runnable(){
public void run(){
//do work recursively if you have to
if(shouldBeRecursive){
phaser.register();
e.submit(recursiveRunnable);
}
phaser.arrive();
}
}
public void doWork(){
int phase = phaser.getPhase();
phaser.register();
e.submit(recursiveRunnable);
phaser.awaitAdvance(phase);
}
Edit: Merci @depthofreality pour souligner la condition de la course dans mon exemple précédent. Je le mets à jour pour que l'exécution du thread n'attende que l'avance de la phase actuelle car il bloque la fonction récursive à compléter.
le nombre de phase ne trébuchera pas avant le nombre de arrive
s == register
S. Puisque avant chaque appel récursif invoque register
un incrément de phase se produira quand toutes les invocations sont complètes.
Wow, vous êtes rapide:)
Merci pour toutes les suggestions. Futures ne s'intègrent pas facilement à mon modèle parce que je ne sais pas combien de runnables sont programmés à l'avance. Donc si je garde une tâche de parent en vie juste pour attendre que ce soit des tâches d'enfant récursives pour finir, j'ai beaucoup d'ordures autour de moi.
j'ai résolu mon problème en utilisant la suggestion AtomicInteger. Essentiellement, je sous-classe ThreadPoolExecutor et augmenter le compteur sur les appels à execute() et de décrémentation sur les appels à afterExecute(). Quand le compteur est à 0, j'appelle shutdown (). Cela semble fonctionner pour mes problèmes, pas sûr si c'est une bonne façon de faire généralement. En particulier, je suppose que vous n'utilisez execute () que pour ajouter des Runnables.
comme un noeud latéral: j'ai d'abord essayé de vérifier après exécute() le nombre de Runnables dans la file d'attente et le nombre de travailleurs qui sont actifs et arrêt lorsque ceux-ci sont 0; mais cela n'a pas fonctionné parce que pas tous Les Runnables sont apparus dans la file d'attente et le getActiveCount() n'a pas fait ce à quoi je m'attendais non plus.
quoi qu'il en soit, voici ma solution: (si quelqu'un trouve de graves problèmes avec cela, s'il vous plaît laissez-moi savoir:)
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger executing = new AtomicInteger(0);
public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit seconds, BlockingQueue<Runnable> queue) {
super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
}
@Override
public void execute(Runnable command) {
//intercepting beforeExecute is too late!
//execute() is called in the parent thread before it terminates
executing.incrementAndGet();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
int count = executing.decrementAndGet();
if(count == 0) {
this.shutdown();
}
}
}
vous pouvez créer votre propre pool de thread qui s'étend ThreadPoolExecutor . Vous voulez savoir quand une tâche a été soumise et quand elle est terminée.
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private int counter = 0;
public MyThreadPoolExecutor() {
super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}
@Override
public synchronized void execute(Runnable command) {
counter++;
super.execute(command);
}
@Override
protected synchronized void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
counter--;
notifyAll();
}
public synchronized void waitForExecuted() throws InterruptedException {
while (counter == 0)
wait();
}
}
utilisez un Future pour vos tâches (au lieu de soumettre Runnable
's), un callback met à jour son état quand il est terminé, de sorte que vous pouvez utiliser futur.isDone pour suivre l'état de toutes vos tâches.
(mea culpa: c'est un peu passé l'heure du coucher ;) mais voici une première tentative d'un loquet dynamique):
package oss.alphazero.sto4958330;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DynamicCountDownLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
private final CountDownLatch toplatch;
public Sync() {
setState(0);
this.toplatch = new CountDownLatch(1);
}
@Override
protected int tryAcquireShared(int acquires){
try {
toplatch.await();
}
catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
return getState() == 0 ? 1 : -1;
}
public boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public boolean tryExtendState(int acquires) {
for (;;) {
int s = getState();
int exts = s+1;
if (compareAndSetState(s, exts)) {
toplatch.countDown();
return exts > 0;
}
}
}
}
private final Sync sync;
public DynamicCountDownLatch(){
this.sync = new Sync();
}
public void await()
throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
{
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public void join() {
sync.tryExtendState(1);
}
}
ce verrou introduit une nouvelle méthode join() à L'API CountDownLatch existante (clonée), qui est utilisée par les tâches pour signaler leur entrée dans le groupe de tâches plus large.
le loquet est transmis de la tâche de parent à la tâche d'enfant. Chaque tâche serait, selon le modèle de Suraj, d'abord 'join()' la serrure, faire sa tâche (), puis countDown().
pour traiter des situations où le thread principal lance le groupe de tâches et attend immédiatement() -- avant que l'un des threads de tâches n'ait eu la chance de rejoindre() -- la classe topLatch
est utilisée dans la classe Sync
interne. Il s'agit d'un loquet qui sera décompté sur chaque jointure(); seul le premier compte à rebours est bien sûr significatif, car tous les suivants sont des nops.
la mise en œuvre initiale ci-dessus introduit une sorte de ride sémantique puisque le tryAcquiredShared (int) n'est pas censé lancer une Exceptionexception interrompue mais alors nous devons faire face à l'interruption sur l'attente sur le topLatch.
est-ce une amélioration par rapport à la solution D'OP utilisant des compteurs atomiques? Je dirais probablement pas IFF il insiste sur l'utilisation des exécuteurs, mais c'est, je crois, une approche alternative tout aussi valable en utilisant les AQS dans ce cas, et, est utilisable avec des threads génériques aussi bien.
Crit away compagnons pirates.
si vous souhaitez utiliser les classes JSR166y - par exemple Phaser ou Fork / Join - dont l'une ou l'autre pourrait fonctionner pour vous, vous pouvez toujours télécharger le backport Java 6 de celles-ci à partir de: http://gee.cs.oswego.edu/dl/concurrency-interest / et utiliser cela comme une base plutôt que d'écrire une solution complètement maison. Ensuite, lorsque 7 sort, vous pouvez simplement laisser tomber la dépendance sur le backport et changer quelques noms de paquets.
(divulgation complète: Nous avons utilisé le LinkedTransferQueue en prod depuis un moment maintenant. Pas de problèmes)
je dois dire, que les solutions décrites ci-dessus de problème avec la tâche d'appel récursive et attendre pour les tâches de sous-ordre de fin ne me satisfait pas. Voici ma solution inspirée de la documentation originale D'Oracle: CountDownLatch et exemple: Human resources CountDownLatch .
le premier fil commun dans le processus par exemple de la classe HRManagerCompact a le loquet d'attente pour les fils de deux filles, qui a latches d'attente pour les 2 fils de leur fille suivante... etc.
bien sûr, le loquet peut être réglé sur la valeur différente de 2 (dans le constructeur de CountDownLatch), ainsi que le nombre d'objets exécutables peut être établi dans l'itération i.e. ArrayList, mais il doit correspondre (le nombre de CountDownLatch doit être égal au paramètre dans le constructeur de CountDownLatch).
attention, le nombre de loquets augmente exponentiellement selon la restriction condition: niveau".get() < 2", ainsi que le nombre d'objets. 1, 2, 4, 8, 16... et Loches 0, 1, 2, 4... Comme vous pouvez le voir, pour quatre niveaux (niveau.get () < 4) il y aura 15 threads en attente et 7 Loches dans le temps, lorsque le pic 16 threads est en cours d'exécution.
package processes.countdownlatch.hr;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/** Recursively latching running classes to wait for the peak threads
*
* @author hariprasad
*/
public class HRManagerCompact extends Thread {
final int N = 2; // number of daughter's tasks for latch
CountDownLatch countDownLatch;
CountDownLatch originCountDownLatch;
AtomicInteger level = new AtomicInteger(0);
AtomicLong order = new AtomicLong(0); // id latched thread waiting for
HRManagerCompact techLead1 = null;
HRManagerCompact techLead2 = null;
HRManagerCompact techLead3 = null;
// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
AtomicInteger level, AtomicLong order){
super(name);
this.originCountDownLatch=countDownLatch;
this.level = level;
this.order = order;
}
private void doIt() {
countDownLatch = new CountDownLatch(N);
AtomicInteger leveli = new AtomicInteger(level.get() + 1);
AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
//techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);
techLead1.start();
techLead2.start();
//techLead3.start();
try {
synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
countDownLatch.await(); // wait actual thread
}
System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
Thread.sleep(10*level.intValue());
if (level.get() < 2) doIt();
Thread.yield();
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
// TODO Auto-generated method stub
System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
originCountDownLatch.countDown(); // count down
}
public static void main(String args[]){
AtomicInteger levelzero = new AtomicInteger(0);
HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
hr.doIt();
}
}
sortie commentée Possible (avec une certaine probabilité):
first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened
Utiliser CountDownLatch . Passez L'objet CountDownLatch à chacune de vos tâches et codez vos tâches comme ci-dessous.
public void doTask() {
// do your task
latch.countDown();
}
alors que le thread qui doit attendre doit exécuter le code suivant:
public void doWait() {
latch.await();
}
mais bien sûr, cela suppose que vous connaissez déjà le nombre de tâches de l'enfant afin que vous puissiez initialiser le compte du verrou.
la seule solution inélégante que j'ai pu trouver est d'utiliser directement un ThreadPoolExecutor et d'interroger son getPoolSize() de temps en temps. Il n'y a pas de meilleur moyen de faire ça?
vous devez utiliser les méthodes shutdown() ,
awaitTermination() and shutdownNow()
dans une séquence appropriée.
shutdown()
: déclenche un arrêt ordonné dans lequel les tâches précédemment soumises sont exécutées, mais aucune nouvelle tâche ne sera accepter.
awaitTermination()
: bloque jusqu'à ce que toutes les tâches aient été exécutées après une requête d'arrêt, ou que le délai se produise, ou que le thread courant soit interrompu, selon la première éventualité.
shutdownNow()
: tente d'arrêter toutes les tâches en cours d'exécution, arrête le traitement des tâches en attente, et renvoie une liste des tâches qui étaient en attente d'exécution.
chemin recommandé de la page de documentation d'oracle de ExecutorService :
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
vous pouvez Remplacer si condition avec condition tandis en cas de longue durée dans l'accomplissement des tâches comme ci-dessous:
Changement
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
à
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
Thread.sleep(60000);
}
vous pouvez vous référer à d'autres alternatives (sauf join()
, qui peut être utilisé avec un filetage autonome ) dans: