Qu'est-ce qui détermine le nombre de threads créés par Java ForkJoinPool?

pour autant que j'ai compris ForkJoinPool , ce pool crée un nombre fixe de threads (par défaut: nombre de noyaux) et ne créera jamais plus de threads (à moins que l'application indique un besoin pour ceux-ci en utilisant managedBlock ).

Cependant, en utilisant ForkJoinPool.getPoolSize() j'ai découvert que dans un programme qui crée 30.000 tâches ( RecursiveAction ), le ForkJoinPool l'exécution de ces tâches utilise 700 threads en moyenne (threads compté chaque fois qu'une tâche est créée). Tâche ne faites pas d'E/S, mais du calcul pur; la seule synchronisation inter-tâches est d'appeler ForkJoinTask.join() et d'accéder à AtomicBoolean , c'est-à-dire qu'il n'y a pas d'opérations de blocage de thread.

puisque join() ne bloque pas le fil appelant tel que je le comprends, il n'y a aucune raison pour qu'un fil dans la piscine ne bloque jamais, et donc (j'avais supposé) il ne devrait y avoir aucune raison de créer d'autres fils (ce qui se produit évidemment néanmoins).

So, pourquoi ForkJoinPool crée autant de fils? Quels facteurs déterminent le nombre de threads créés?

j'avais espéré que cette question pourrait être répondue sans code d'affichage, mais ici il vient sur demande. Ce code est un extrait d'un programme de quatre fois la taille, réduite aux parties essentielles; il ne compile pas comme il est. Si vous le souhaitez, je peux bien sûr poster le programme complet, aussi.

le programme recherche dans un labyrinthe un chemin à partir d'un point de départ jusqu'à un point d'arrivée donné en utilisant la recherche en profondeur-première recherche. Une solution de l'existence est garantie. La logique principale est dans la méthode compute() de SolverTask : une RecursiveAction qui commence à un point donné et continue avec tous les points voisins accessibles à partir du point courant. Plutôt que de créer un nouveau SolverTask à chaque point de branchement (ce qui créerait beaucoup trop de tâches), il pousse tous les voisins sauf un sur une pile de traçage pour être traités plus tard et continue avec seulement le un voisin pas poussé à la pile. Une fois qu'il atteint une impasse de cette façon, le point le plus récemment poussé à la pile de traçage est popped, et la recherche continue à partir de là (couper en conséquence le chemin construit à partir du point de départ de taks). Une nouvelle tâche est créée une fois qu'une tâche trouve sa pile de traçage plus grande qu'un certain seuil; à partir de ce moment, la tâche, tout en continuant à pop de sa pile de traçage jusqu'à ce que ce soit épuisé, ne poussera pas d'autres points à sa pile lorsque vous atteignez un point de ramification, mais créer une nouvelle tâche pour chacun de ces points. Ainsi, la taille des tâches peut être ajustée en utilisant le seuil limite de la pile.

les nombres que j'ai cités ci-dessus ("30,000 tâches, 700 threads en moyenne") proviennent de la recherche d'un labyrinthe de 5000x5000 cellules. Voici donc le code essentiel:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
35
demandé sur palacsint 2012-05-29 14:44:22

4 réponses

il y a des questions sur stackoverflow:

ForkJoinPool stands lors de invokeAll/join

ForkJoinPool semble gaspiller un fil

j'ai fait une version exécutable dépouillée de ce qui se passe (les arguments jvm que j'ai utilisés: -Xms256m-Xmx1024m-Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

apparemment quand vous effectuez une jointure, le fil courant voit que nécessaire tâche n'est pas encore terminée et prend une autre tâche pour lui-même à faire.

ça se passe dans java.util.concurrent.ForkJoinWorkerThread#joinTask .

cependant cette nouvelle tâche génère plus des mêmes tâches, mais ils ne peuvent pas trouver de threads dans le pool, parce que les threads sont verrouillés dans jointure. Et comme il n'a aucun moyen de savoir combien de temps il lui faudra pour qu'ils soient libérés (le fil pourrait être en boucle infinie ou bloqué pour toujours), un nouveau fil (s) est (sont) engendré (compensant pour les fils joints comme Louis Wasserman mentioned): java.util.concurrent.ForkJoinPool#signalWork

donc pour prévenir un tel scénario, vous devez éviter le frai récursif des tâches.

par exemple, si dans le code ci-dessus vous définissez le paramètre initial à 1, le nombre de threads actifs sera de 2, même si vous multipliez par dix le nombre d'enfants.

notez aussi que, tandis que la quantité de threads actifs augmente, la quantité de threads en cours d'exécution est inférieure ou égale au parallélisme .

13
répondu elusive-code 2017-05-23 12:26:15

à Partir de la source de commentaires:

compensation: à moins qu'il n'y ait déjà assez de fils vivants, la méthode tryPreBlock() peut créer ou réactiver un fil de rechange pour compenser les menuisiers bloqués jusqu'à ce qu'ils se débloquent.

je pense que ce qui se passe c'est que vous ne terminez aucune des tâches très rapidement, et puisqu'il n'y a pas de threads ouvriers disponibles quand vous soumettez une nouvelle tâche, un nouveau thread est créé.

10
répondu Louis Wasserman 2012-05-29 10:59:31
"151900920 strict", plein stricte, et terminale stricte avoir à faire avec le traitement d'un graphe dirigé acyclique (DAG). Vous pouvez google ces termes pour obtenir une compréhension complète. C'est le type de traitement le cadre a été conçu pour traiter. Regardez le code dans L'API pour Recursive..., le framework s'appuie sur votre code compute() pour faire d'autres liens compute() et ensuite faire une join(). Chaque tâche fait une jointure simple () comme traiter un DAG.

Vous ne faites pas DAG traitement. Vous faites beaucoup de nouvelles tâches et vous attendez (join()) sur chacune. Avoir une lecture dans le code source. C'est horriblement complexe, mais vous pourrez peut-être le découvrir. Le cadre ne permet pas une bonne gestion des tâches. Où va-t-il mettre la tâche d'attente quand il fait une jointure()? Il n'y a pas de file d'attente suspendue, qui nécessiterait un thread de moniteur pour regarder constamment la file d'attente pour voir ce qui est terminé. C'est pourquoi le framework utilise des "threads de continuation". Lorsqu'une tâche ne join() de la le cadre suppose qu'il attend la fin d'une seule tâche inférieure. Quand beaucoup de méthodes join () sont présentes le thread ne peut pas continuer ainsi un helper ou un thread de continuation doit exister.

comme indiqué ci-dessus, vous avez besoin d'un processus scatter-gather type fork-join. Là, vous pouvez fourguer autant de tâches

8
répondu edharned 2015-10-15 16:14:21

les deux extraits de code affichés par Holger Peine et elusive-code ne suit pas réellement la pratique recommandée qui figure dans javadoc pour la version 1.8 :

dans les usages les plus typiques, une paire de fourches agissent comme un appel (fork) et retourner (join) à partir d'une fonction récursive parallèle. Comme c'est le cas avec d'autres formes d'appels récursifs, les retours (rejoindre) doit être effectuée intime-première. Par exemple, A. fourchette(); b.fork(); b.join ();.join(); est probablement beaucoup plus efficace que de joindre le code un avant le code b .

dans les deux cas, FJPool a été instancié via un constructeur par défaut. Cela conduit à la construction de la piscine avec asyncMode=false , qui est par défaut:

@param asyncMode si la valeur est true,

établit le mode d'ordonnancement local du premier entré, premier sorti pour la fourche les tâches qui ne sont jamais rejoint. Ce mode peut être plus approprié que le mode local par défaut basé sur la pile dans les applications dans lesquelles les threads des travailleurs ne traitent que les tâches asynchrones de type événement. Pour la valeur par défaut, utiliser de faux.

de cette façon la file d'attente de travail est en fait lifo:

tête - > / t4 / t3 / t2 | t1/... | <- queue

ainsi dans les snippets ils fork () toutes les tâches les poussant sur la pile et que join () dans le même ordre, c'est-à-dire de la tâche la plus profonde (t1) à la plus haute (t4) bloquant effectivement jusqu'à ce qu'un autre fil vole (t1), puis (t2) et ainsi de suite. Puisqu'il y a des tâches enouth pour bloquer tous les threads de pool (task_count >> pool.getParallelism()) la rémunération des coups de pied dans Louis Wasserman décrit.

2
répondu Artёm Basov 2017-07-12 21:14:56