Impossible de faire un pool de filetage en cache avec une limite de taille?
il semble impossible de créer un pool de threads avec une limite au nombre de threads qu'il peut créer.
voici comment les exécuteurs statiques.newCachedThreadPool est implémenté dans la bibliothèque Java standard:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
donc, en utilisant ce modèle pour continuer à créer un pool de thread en cache de taille fixe:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());
maintenant si vous utilisez ceci et soumettez 3 tâches, tout ira bien. Soumettre toute autre les tâches entraîneront le rejet des exceptions d'exécution.
Essayer ceci:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());
donnera une exécution séquentielle de tous les threads. I. e., le pool de threads ne fera jamais plus d'un thread pour gérer vos tâches.
ceci est un bug dans la méthode execute de ThreadPoolExecutor? Ou peut-être que c'est intentionnel? Ou il y a une autre façon?
Edit: je veux quelque chose exactement comme le pool de threads en cache (il crée des threads à la demande et les tue ensuite après un certain temps d'arrêt) mais avec une limite sur le nombre de threads qu'il peut créer et la capacité de continuer à faire la queue des tâches supplémentaires une fois qu'il a atteint sa limite de thread. Selon la réponse de sjlee, c'est impossible. En regardant la méthode execute () de ThreadPoolExecutor, c'est en effet impossible. J'aurais besoin de sous-classe ThreadPoolExecutor et override execute() un peu comme SwingWorker le fait, mais ce que SwingWorker fait dans son execute () est un complet hack.
11 réponses
le ThreadPoolExecutor a plusieurs comportements clés, et vos problèmes peuvent être expliqués par ces comportements.
Lorsque les tâches sont soumis,
- si le pool de fils n'a pas atteint la taille du noyau, il crée de nouveaux fils.
- si la taille du cœur a été atteinte et qu'il n'y a pas de threads inactifs, il fait la queue pour les tâches.
- si la taille du cœur a été atteinte, il n'y a pas de threads inactifs, et la file d'attente devient pleine, elle crée de nouveaux threads (jusqu'à ce qu'elle atteigne la taille max).
- si la taille max a été atteinte, il n'y a pas de threads inactifs, et la file d'attente devient pleine, la Politique de rejet entre en jeu.
dans le premier exemple, notez que la Synchronequeue a essentiellement une taille de 0. Par conséquent, au moment où vous atteignez la taille max (3), la Politique de rejet entre en jeu (#4).
Dans le deuxième exemple, la file d'attente de le choix est une queue Linkedblocking qui a une taille illimitée. Donc, vous êtes coincé avec le comportement #2.
vous ne pouvez pas vraiment bricoler beaucoup avec le type caché ou le type fixe, car leur comportement est presque complètement déterminé.
si vous voulez avoir un pool de filetage limité et dynamique, vous devez utiliser une taille de coeur positive et la taille maximale combinée avec une file d'attente d'une taille finie. Par exemple,
new ThreadPoolExecutor(10, // core size
50, // max size
10*60, // idle timeout
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20)); // queue with a size
Addendum : c'est une réponse assez ancienne, et il semble que JDK a changé son comportement quand il s'agit de la taille du cœur de 0. Depuis JDK 1.6, si la taille du noyau est 0 et que le pool n'a pas de threads, Le ThreadPoolExecutor ajoutera un thread pour exécuter cette tâche. Par conséquent, la taille de base de 0 est une exception à la règle ci-dessus. Merci Steve pour porter à mon attention.
à moins que j'ai manqué quelque chose, la solution à la question originale est simple. Le code suivant implémente le comportement désiré tel que décrit par l'affiche originale. Il générera jusqu'à 5 threads pour travailler sur une file d'attente illimitée et les threads inactifs se termineront après 60 secondes.
tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);
avait la même question. comme aucune autre réponse ne rassemble tous les problèmes, j'ajoute le mien:
il est maintenant clairement écrit dans docs : si vous utilisez une file d'attente qui ne bloque pas ( LinkedBlockingQueue
) le réglage des threads max n'a aucun effet, seuls les threads core sont utilisés.
:
public class MyExecutor extends ThreadPoolExecutor {
public MyExecutor() {
super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
allowCoreThreadTimeOut(true);
}
public void setThreads(int n){
setMaximumPoolSize(Math.max(1, n));
setCorePoolSize(Math.max(1, n));
}
}
cet exécuteur a:
-
max threads car nous utilisons une file d'attente illimitée. C'est une bonne chose parce qu'une telle file d'attente peut amener l'exécuteur à créer un nombre massif de threads supplémentaires non-core s'il suit sa politique habituelle.
-
Une file d'attente de taille maxi
Integer.MAX_VALUE
.Submit()
lanceraRejectedExecutionException
si le nombre de tâches en attente dépasseInteger.MAX_VALUE
. Je ne suis pas sûr que nous manquerons de mémoire d'abord ou cela arrivera. -
a 4 noyau threads possible. Les threads de coeur inactifs s'arrêtent automatiquement s'ils sont inactifs pendant 5 secondes.Donc, oui, uniquement sur demande.Le nombre peut être modifié en utilisant la méthode
setThreads()
. -
S'assure que le nombre minimum de threads core n'est jamais inférieur à un, ou bien
submit()
rejettera chaque tâche. Puisque les threads principaux doivent être > = threads max la méthodesetThreads()
définit les threads max aussi bien, bien que la définition de threads max soit inutile pour une file d'attente non limitée.
dans votre premier exemple, les tâches suivantes sont rejetées parce que AbortPolicy
est la valeur par défaut RejectedExecutionHandler
. Le ThreadPoolExecutor contient les politiques suivantes, que vous pouvez modifier via la méthode setRejectedExecutionHandler
:"
CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy
on dirait que vous voulez du thread caché avec une CallerRunsPolicy.
aucune des réponses ici n'a résolu mon problème, qui avait à voir avec la création d'un nombre limité de connexions HTTP en utilisant le client HTTP D'Apache (3.x version). Comme il m'a fallu quelques heures pour trouver une bonne configuration, je vais partager:
private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
cela crée un ThreadPoolExecutor
qui commence par cinq et tient un maximum de dix threads fonctionnant simultanément en utilisant CallerRunsPolicy
pour l'exécution.
Par la Javadoc de ThreadPoolExecutor:
S'il y a plus que corePoolSize mais moins que maximumPoolSize threads en cours d'exécution, un nouveau thread sera créé seulement si la file d'attente est complète . En réglant corePoolSize et maximumPoolSize de la même façon, vous créez un pool de filetage de taille fixe.
(l'Emphase est mienne.)
gigue de réponse est ce que vous voulez, bien que mine de vos réponses autre question. :)
il y a une autre option. Au lieu d'utiliser New SynchronousQueue, vous pouvez utiliser n'importe quelle autre file d'attente, mais vous devez vous assurer que sa taille est 1, de sorte que executorservice sera forcé de créer un nouveau thread.
ne semble pas comme si l'une des réponses répondent réellement à la question - en fait, je ne peux pas voir un moyen de le faire - même si vous sous-classe de PooledExecutorService puisque beaucoup de méthodes/propriétés sont privées par exemple faisant addIfUnderMaximumPoolSize a été protégé, vous pourriez faire ce qui suit:
class MyThreadPoolService extends ThreadPoolService {
public void execute(Runnable run) {
if (poolSize() == 0) {
if (addIfUnderMaximumPoolSize(run) != null)
return;
}
super.execute(run);
}
}
le plus proche que j'ai eu était celui - ci- mais même cela n'est pas une très bonne solution
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
public void execute(Runnable command) {
if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {
super.setCorePoolSize(super.getCorePoolSize() + 1);
}
super.execute(command);
}
protected void afterExecute(Runnable r, Throwable t) {
// nothing in the queue
if (getQueue().isEmpty() && getPoolSize() > min) {
setCorePoolSize(getCorePoolSize() - 1);
}
};
};
p. S.
C'est ce que vous voulez (du moins je suppose). Pour une explication cochez Jonathan Feinberg réponse
Executors.newFixedThreadPool(int n)
crée un pool de threads qui réutilise un nombre fixe de threads opérant à partir d'une file d'attente partagée sans limite. À tout moment, à la plupart des threads nThreads seront des tâches de traitement actives. Si des tâches supplémentaires sont soumises lorsque tous les threads sont actifs, ils attendront la file d'attente jusqu'à ce qu'un fil est disponible. Si un thread se termine à cause d'une défaillance durant l'exécution avant l'arrêt, un nouveau thread prendra sa place si nécessaire pour exécuter des tâches ultérieures. Les threads dans le pool existeront jusqu'à ce qu'il soit explicitement arrêté.
Voici une autre solution. Je pense que cette solution se comporte comme vous le souhaitez (mais pas fier de cette solution):
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
public boolean offer(Runnable o) {
if (size() > 1)
return false;
return super.offer(o);
};
public boolean add(Runnable o) {
if (super.offer(o))
return true;
else
throw new IllegalStateException("Queue full");
}
};
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
queue.add(r);
}
};
dbThreadExecutor =
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
-
vous pouvez utiliser
ThreadPoolExecutor
comme suggéré par @sjleeVous pouvez contrôler la taille de la piscine de façon dynamique. Regardez cette question pour plus de détails:
ou
-
vous pouvez utiliser NEWWORKSTEALINGPOOL API, qui a été introduit avec java 8.
public static ExecutorService newWorkStealingPool()
crée un pool de threads en utilisant tous les processeurs disponibles comme niveau de parallélisme cible.
par défaut, le niveau de parallélisme est défini au nombre de cœurs CPU de votre serveur. Si vous avez 4 serveur CPU de base, la taille du pool de thread serait de 4. Cette API retourne ForkJoinPool
type de ExecutorService
et permet de voler le travail des threads inactifs en volant des tâches de busy les threads ForkJoinPool.