Comment le framework fork/join est-il meilleur qu'un pool de threads?
Quels sont les avantages de l'utilisation du nouveau cadre fork/join en divisant simplement la grande tâche en N sous-tâches au début, en les envoyant à un pool de threads mis en cache (à partir de exécuteurs) et en attendant que chaque tâche se termine? Je ne vois pas comment l'utilisation de l'abstraction fork/join simplifie le problème ou rend la solution plus efficace par rapport à ce que nous avons depuis des années.
Par exemple, l'algorithme de flou parallélisé dans l'exemple de tutoriel pourrait être implémenté comme ceci:
public class Blur implements Runnable {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd.
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
public void run() {
computeDirectly();
}
protected void computeDirectly() {
// As in the example, omitted for brevity
}
}
Diviser au début et envoyer des tâches à un pool de threads:
// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool
int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();
// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
int size = Math.min(maxSize, src.length - i);
ForkBlur task = new ForkBlur(src, i, size, dst);
Future f = threadPool.submit(task);
futures.add(f);
}
// Wait for all sent tasks to complete:
for (Future future : futures) {
future.get();
}
// Done!
Les tâches vont dans la file d'attente du pool de threads, à partir de laquelle elles sont exécutées au fur et à mesure que les threads de travail deviennent disponibles. Tant que la division est suffisamment granulaire (pour éviter d'avoir à attendre particulièrement la dernière tâche) et que le pool de threads a suffisamment de threads (au moins N de processeurs), tous les processeurs fonctionnent à pleine vitesse jusqu'à ce que tout le calcul soit fait.
Suis-je absent quelque chose? Quelle est la valeur ajoutée de l'utilisation du framework fork/join?
10 réponses
Je pense que le malentendu de base est que les exemples Fork / Join font PAS montrer le travail voler mais seulement une sorte de division et de conquête standard.
Le vol de travail serait comme ceci: le travailleur B a terminé son travail. Il est un genre, donc il regarde autour et voit travailleur un travail encore très dur. Il se promène et demande: "Hé garçon, je pourrais vous donner un coup de main." Réponse. "Cool, j'ai cette tâche de 1000 unités. Jusqu'à présent, j'ai terminé 345 laissant 655. Pourriez vous s'il vous plaît travailler sur le numéro 673 à 1000, je vais faire le 346 à 672."B dit" OK, commençons donc nous pouvons aller au pub plus tôt."
Vous voyez-les travailleurs doivent communiquer entre eux même quand ils ont commencé le vrai travail. C'est la partie manquante dans les exemples.
Les exemples d'autre part montrent seulement quelque chose comme "utiliser des sous-traitants":
Travailleur A: "Dang, j'ai 1000 unités de travail. Trop pour moi. Je vais faire 500 moi-même et sous-traiter 500 à quelqu'un d'autre."Cela continue jusqu'à ce la grande tâche est décomposée en petits paquets de 10 unités chacun. Ceux - ci seront exécutés par les travailleurs disponibles. Mais si un paquet est une sorte de pilule empoisonnée et prend beaucoup plus de temps que les autres paquets-malchance, la phase de division est terminée.
La seule différence restante entre Fork/Join et diviser la tâche en amont est la suivante: lors du fractionnement en amont, vous avez la file d'attente de travail complète dès le début. Exemple: 1000 unités, le seuil est 10, donc la file d'attente a 100 entrées. Ils les paquets sont distribués aux membres de threadpool.
Fork / Join est plus complexe et essaie de garder le nombre de paquets dans la file d'attente plus petit:
- Étape 1: Mettre un paquet contenant (1...1000) dans la file d'attente
- Étape 2: un travailleur ouvre le paquet (1...1000) et le remplace par deux paquets: (1...500) et (501...1000).
- Étape 3: un paquet pop travailleur (500...1000) et pousse (500...750) et (751...1000).
- étape n: la pile contient ces paquets: (1..500), (500...750), (750...875)... (991..1000)
- étape n + 1: paquet (991..1000) est sauté et exécuté
- étape n + 2: Paquet (981..990) est sauté et exécuté
- étape n + 3: paquet (961..980) est sauté et divisé en (961...970) et (971..980). ....
Vous voyez: dans Fork / Join, la file d'attente est plus petite (6 dans l'exemple) et les phases "split" et "work" sont entrelacées.
Lorsque plusieurs travailleurs sautent et poussent simultanément les interactions ne sont pas si clair bien sûr.
Si vous avez n threads occupés qui fonctionnent tous à 100% indépendamment, ce sera mieux que n threads dans un pool Fork-Join (FJ). Mais ça ne fonctionne jamais de cette façon.
Il pourrait ne pas être capable de diviser précisément le problème en n morceaux égaux. Même si vous le faites, la planification des threads est loin d'être juste. Vous finirez par attendre le fil le plus lent. Si vous avez plusieurs tâches, elles peuvent chacune fonctionner avec un parallélisme inférieur à n-way (généralement plus efficace), mais aller jusqu'à n-way lorsque les autres tâches sont terminées.
Alors pourquoi ne pas simplement couper le problème en morceaux de taille FJ et faire travailler un pool de threads là-dessus. L'utilisation typique de FJ coupe le problème en petits morceaux. Faire ces dans un ordre aléatoire nécessite beaucoup de coordination au niveau matériel. Les frais généraux seraient un tueur. En FJ, les tâches sont placées dans une file d'attente que le thread lit dans L'ordre Last In First Out (LIFO/stack), et le vol de travail (dans le travail de base, en général) est effectué premier entré premier sorti (FIFO/"file d'attente"). Le le résultat est que le traitement de longue rangée peut être fait en grande partie séquentiellement, même s'il est divisé en petits morceaux. (Il est également vrai qu'il pourrait ne pas être trivial de diviser le problème en petits morceaux de taille uniforme dans un big bang. Disons traiter avec une certaine forme de hiérarchie sans équilibrer.)
Conclusion: FJ permet une utilisation plus efficace des threads matériels dans des situations inégales, ce qui sera toujours le cas si vous avez plus d'un thread.
Fork / join est différent d'un pool de threads car il implémente le vol de travail. À Partir De Fork/Join
Comme avec N'importe quel ExecutorService, le framework fork/join distribue les tâches aux threads de travail dans un pool de threads. Le cadre fork / join est distinct parce qu'il utilise un algorithme de vol de travail. Les threads de travail ce manque de choses à faire peut voler des tâches d'autres threads qui sont encore occupés.
Disons que vous avez deux threads, et 4 tâches a, b, c, D qui prendre 1, 1, 5 et 6 secondes. Initialement, a et b sont affectés au thread 1 et c et d au thread 2. Dans un pool de threads, cela prendrait 11 secondes. Avec fork / join, le thread 1 se termine et peut voler le travail du thread 2, donc la tâche d finirait par être exécutée par le thread 1. Thread 1 exécute a, b et d, thread 2 juste C. Temps Global: 8 secondes, pas 11.
EDIT: comme le souligne Joonas, les tâches ne sont pas nécessairement pré-allouées à un thread. L'idée de fork/join est qu'un thread peut choisir pour diviser une tâche en plusieurs sous-pièces. Donc, pour reformuler ce qui précède:
Nous avons deux tâches (ab) et (cd) qui prennent respectivement 2 et 11 secondes. Le Thread 1 commence à exécuter ab et le divise en deux sous-tâches a & B. de même avec le thread 2, il se divise en deux sous-tâches c & D. Lorsque le thread 1 a terminé a & b, il peut voler d du thread 2.
Tout le monde ci-dessus est correct les avantages sont obtenus par le vol de travail, mais pour développer pourquoi c'est.
Le principal avantage est la coordination efficace entre les threads de travail. Le travail doit être divisé et réassemblé, ce qui nécessite une coordination. Comme vous pouvez le voir dans la réponse de A. H Ci-dessus, chaque thread a sa propre liste de travail. Une propriété importante de cette liste est qu'elle est triée (grandes tâches en haut et petites tâches en bas). Chaque thread exécute les tâches à la en bas de sa liste et vole des tâches à partir du haut des autres listes de threads.
Le résultat de ceci est:
- la tête et la queue des listes de tâches peuvent être synchronisées indépendamment, ce qui réduit les conflits sur la liste.
- les sous-arbres significatifs du travail sont divisés et réassemblés par le même thread, de sorte qu'aucune coordination entre les threads n'est requise pour ces sous-arbres.
- Quand un fil vole du travail, il prend un gros morceau qu'il subdivise ensuite sur le sien liste
- le travail steeling signifie que les fils sont presque entièrement utilisés jusqu'à la fin du processus.
La plupart des autres schémas divide and conquer utilisant des pools de threads nécessitent plus de communication et de coordination entre les threads.
Dans cet exemple, Fork / Join n'ajoute aucune valeur car le forking n'est pas nécessaire et la charge de travail est répartie uniformément entre les threads de travail. Fork / Join ne fait qu'ajouter des frais généraux.
Voici un bel article sur le sujet. Citation:
Globalement, nous pouvons dire que le ThreadPoolExecutor doit être préféré où la charge de travail est répartie uniformément entre les threads de travail. Pour être en mesure pour garantir cela, vous devez savoir précisément ce que les données d'entrée ressemble. En revanche, l' Forkjoinpool offre de bonnes performances indépendamment des données d'entrée et est donc beaucoup plus robuste solution.
L'objectif ultime des pools de threads et de Fork / Join sont les mêmes: les deux veulent utiliser la puissance du processeur disponible du mieux qu'ils peuvent pour un débit maximal. Le débit maximal signifie que le plus grand nombre possible de tâches doivent être effectuées sur une longue période de temps. Ce qui est nécessaire pour le faire? (Pour ce qui suit, nous supposerons qu'il n'y a pas de pénurie de tâches de calcul: il y a toujours assez à faire pour 100% d'utilisation du processeur. De plus, j'utilise" CPU " de manière équivalente pour les cœurs ou les cœurs virtuels en cas de la technologie hyper-threading).
- au moins, il doit y avoir autant de threads en cours d'exécution que de processeurs disponibles, car l'exécution de moins de threads laissera un noyau inutilisé.
- au maximum, il doit y avoir autant de threads en cours d'exécution que de processeurs disponibles, car l'exécution de plusieurs threads créera une charge supplémentaire pour le planificateur qui attribue des processeurs aux différents threads, ce qui entraîne un certain temps CPU pour aller au planificateur plutôt que notre tâche de calcul.
Ainsi nous j'ai compris que pour un débit maximum, nous devons avoir exactement le même nombre de threads que les processeurs. Dans l'exemple de flou D'Oracle, vous pouvez à la fois prendre un pool de threads de taille fixe avec le nombre de threads égal au nombre de processeurs disponibles ou utiliser un pool de threads. Cela ne fera aucune différence, vous avez raison!
Alors, quand allez-vous avoir des ennuis avec une piscine de fil? C'est-à-dire si un thread bloque, car votre thread attend qu'une autre tâche soit terminée. Supposons ce qui suit exemple:
class AbcAlgorithm implements Runnable {
public void run() {
Future<StepAResult> aFuture = threadPool.submit(new ATask());
StepBResult bResult = stepB();
StepAResult aResult = aFuture.get();
stepC(aResult, bResult);
}
}
Ce que nous voyons ici est un algorithme qui se compose de trois étapes A, B et C. A et B peuvent être effectuées indépendamment les unes des autres, mais l'étape C a besoin du résultat de l'étape A et B. Ce que cet algorithme fait est de soumettre la tâche a au threadpool et d'effectuer la tâche B directement. Après cela, le thread attendra que la tâche A soit également effectuée et continuera avec l'étape C. Si A et B sont terminés en même temps, tout va bien. Mais que faire si A prend plus de temps que B? Qui peut être parce que la nature de la tâche un dicte, mais il peut aussi être le cas parce qu'il n'y a pas thread pour la tâche a disponible au début et la tâche A doit attendre. (S'il n'y a qu'un seul processeur disponible et que votre threadpool n'a qu'un seul thread, cela provoquera même un blocage, mais pour l'instant c'est en dehors du point). Le fait est que le thread qui vient d'exécuter la tâche B bloque tout le thread . Puisque nous avons le même nombre de threads que les processeurs et qu'un thread est bloqué signifie que un PROCESSEUR est inactif.
Fork / Join résout ce problème: dans le framework fork/join, vous écrivez le même algorithme comme suit:
class AbcAlgorithm implements Runnable {
public void run() {
ATask aTask = new ATask());
aTask.fork();
StepBResult bResult = stepB();
StepAResult aResult = aTask.join();
stepC(aResult, bResult);
}
}
On dirait la même chose, n'est-ce pas? Cependant l'indice est que aTask.join
ne bloque pas . Au lieu de cela, voici où vol de travail entre en jeu: le fil cherchera d'autres tâches qui ont été fourchues dans le passé et continuera avec celles-ci. D'abord, il vérifie si les tâches qu'il a fourche elle-même commencé le traitement. Donc, si A n'a pas encore été démarré par un autre thread, il fera un suivant, sinon il vérifiera la file d'attente des autres threads et volera leur travail. Une fois cette autre tâche d'un autre thread terminée, elle vérifiera si A est terminé maintenant. Si c'est l'algorithme ci-dessus peut appeler stepC
. Sinon, il cherchera encore une autre tâche à voler. Ainsi, Fork / join pools peut atteindre 100% d'utilisation du processeur, même face aux actions de blocage.
Cependant, il y a un trap: le vol de travail n'est possible que pour l'appel join
de ForkJoinTask
s. Cela ne peut pas être fait pour des actions de blocage externes comme attendre un autre thread ou attendre une action d'E/S. Alors qu'en est-il de cela, attendre que les E / S se terminent est une tâche courante? Dans ce cas, si nous pouvions ajouter un thread supplémentaire à Fork/Join pool qui sera arrêté à nouveau dès que l'action de blocage est terminée sera la deuxième meilleure chose à faire. Et le ForkJoinPool
peut réellement le faire si nous utilisons ManagedBlocker
S.
Fibonacci
Dans le JavaDoc pour RecursiveTask est un exemple pour calculer les nombres de Fibonacci en utilisant Fork / Join. Pour une solution récursive classique, voir:
public static int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}
Comme cela est expliqué dans les JavaDocs, c'est une jolie façon de calculer les nombres de fibonacci, car cet algorithme a une complexité O(2^n) Alors que des moyens plus simples sont possibles. Cependant, cet algorithme est très simple et facile à comprendre, donc nous en tenir avec elle. Supposons que nous voulons accélérer cela avec fourchette / jointure. Une implémentation naïve ressemblerait à ceci:
class Fibonacci extends RecursiveTask<Long> {
private final long n;
Fibonacci(long n) {
this.n = n;
}
public Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
Les étapes que cette tâche est divisée en sont beaucoup trop courtes et donc cela fonctionnera horriblement, mais vous pouvez voir comment le framework fonctionne généralement très bien: les deux sommands peuvent être calculés indépendamment, mais nous avons besoin des deux pour construire le résultat final. Donc, une moitié est faite dans un autre fil. Amusez-vous à faire la même chose avec les pools de threads sans obtenir une impasse (possible, mais pas aussi simple).
Juste pour être complet: si vous voulez réellement calculer les nombres de Fibonacci en utilisant cette approche récursive, voici une version optimisée:
class FibonacciBigSubtasks extends RecursiveTask<Long> {
private final long n;
FibonacciBigSubtasks(long n) {
this.n = n;
}
public Long compute() {
return fib(n);
}
private long fib(long n) {
if (n <= 1) {
return 1;
}
if (n > 10 && getSurplusQueuedTaskCount() < 2) {
final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
f1.fork();
return f2.compute() + f1.join();
} else {
return fib(n - 1) + fib(n - 2);
}
}
}
Cela garde les sous-tâches beaucoup plus petites car elles ne sont divisées que lorsque n > 10 && getSurplusQueuedTaskCount() < 2
est true, ce qui signifie qu'il y a beaucoup plus de 100 appels de méthode à faire (n > 10
) et qu'il n'y a pas de tâches man en attente (getSurplusQueuedTaskCount() < 2
).
Sur mon ordinateur (4 core (8 lors du comptage Hyper-threading), Intel (R) Core (TM) i7-2720QM CPU @ 2.20 GHz) le fib(50)
prend 64 secondes avec l'approche classique et seulement 18 secondes avec L'approche Fork/Join ce qui est un gain assez notable, mais pas autant que théoriquement possible.
Résumé
- Oui, dans votre exemple Fork / Join n'a aucun avantage sur les pools de threads classiques.
- Fork / Join peut considérablement améliorer les performances lorsque le blocage est impliqué
- Fork / Join évite certains problèmes de blocage
Une autre différence importante semble être qu'avec F-J, vous pouvez faire plusieurs phases "Join" complexes. Considérez le tri de fusion à partir de http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html , Il y aurait trop d'orchestration nécessaire pour pré-diviser ce travail. par exemple, Vous devez faire les choses suivantes:
- trier le premier trimestre
- trier le deuxième trimestre
- fusionner les 2 premiers trimestres
- trier le troisième trimestre
- trier le quatrième trimestre
- fusionner les 2 derniers trimestres
- fusionner les 2 moitiés
Comment spécifiez-vous que vous devez faire les tris avant les fusions qui les concernent, etc.
J'ai regardé la meilleure façon de faire une certaine chose pour chacun d'une liste d'éléments. Je pense que je vais juste pré-diviser la liste et utiliser un ThreadPool standard. F-J semble plus utile lorsque le travail ne peut pas être pré-divisé en suffisamment de tâches indépendantes mais peut être récursivement divisé en tâches indépendantes entre eux (par exemple, le tri des moitiés est indépendant mais la fusion des 2 moitiés triées en un ensemble trié ne l'est pas).
F/J a également un avantage distinct lorsque vous avez des opérations de fusion coûteuses. Parce qu'il se divise en une structure arborescente, vous ne faites que des fusions log2(n) par opposition à n fusionne avec le fractionnement de thread linéaire. (Cela fait l'hypothèse théorique que vous avez autant de processeurs que de threads, mais toujours un avantage) pour un devoir, nous avons dû fusionner plusieurs milliers de tableaux 2D (toutes les mêmes dimensions) en additionnant les valeurs à chaque index. Avec les processeurs Fork join et P le temps approche log2 (n) Lorsque P se rapproche de l'infini.
1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9
Si le problème est tel que nous devons attendre que d'autres threads se terminent(comme dans le cas du tri du tableau ou de la somme du tableau), Fork join doit être utilisé, comme exécuteur (exécuteurs.newFixedThreadPool( 2)) s'étouffe en raison du nombre limité de threads. Le pool forkjoin créera plus de threads dans ce cas pour couvrir le thread bloqué afin de maintenir le même parallélisme
Source: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Le problème avec les exécuteurs pour l'implémentation des algorithmes divide and conquer n'est pas lié à la création de sous-tâches, car un appelable est libre de soumettre une nouvelle sous-tâche à son exécuteur et d'attendre son résultat de manière synchrone ou asynchrone. Le problème est celui du parallélisme: Lorsqu'un appelable attend le résultat d'un autre appelable, il est mis dans un État d'attente, gaspillant ainsi une opportunité de gérer un autre appelable en file d'attente pour l'exécution.
Le cadre fork/join ajouté au Java.util.le paquet concurrent dans Java SE 7 grâce aux efforts de Doug Lea comble cette lacune
Source: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html
Le pool tente de maintenir suffisamment de threads actifs (ou disponibles) en ajoutant, suspendant ou reprenant dynamiquement des threads de travail internes, même si certaines tâches sont bloquées en attendant de rejoindre d'autres. Cependant, aucun ajustement de ce type n'est garanti face à des e / s bloquées ou à d'Autres E / S non gérées synchronisation
Public int getPoolSize() Renvoie le nombre de threads de travail qui ont démarré mais pas encore terminés. le résultat renvoyé par cette méthode peut différer de getParallelism () lorsque les threads sont créés pour maintenir le parallélisme lorsque les autres sont bloqués de manière coopérative.
Vous seriez étonné des performances de ForkJoin dans une application comme crawler. voici le meilleur tutoriel que vous apprendrez.
La logique de Fork/Join est très simple: (1) séparer (fork) chaque grande tâche (2) traiter chaque tâche dans un thread séparé (séparant ceux-ci en tâches encore plus petites si nécessaire); (3) Rejoignez le résultat.