Pool de threads personnalisé en Java 8 flux parallèle

est-il possible de spécifier un pool de threads personnalisé pour Java 8 parallel stream ? Je ne peux pas le trouver n'importe où.

Imaginez que j'ai une application serveur et que je voudrais utiliser des flux parallèles. Mais l'application est large et multi-threadé donc je veux la compartimenter. Je ne veux pas d'une tâche lente dans un module des tâches de blocage d'application d'un autre module.

si Je ne peux pas utiliser un autre fil piscines pour différents modules, cela signifie que je ne peux pas utiliser en toute sécurité des flux parallèles dans la plupart des situations du monde réel.

Essayez l'exemple suivant. Il y a des tâches intensives en CPU exécutées dans des threads séparés. Les tâches tirent parti des flux parallèles. La première tâche est cassée, de sorte que chaque étape prend 1 seconde (simulée par le sommeil du fil). Le problème est que d'autres fils se coincent et attendent que la tâche cassée pour finir. C'est un exemple artificiel, mais imaginez une application servlet et quelqu'un soumettre une tâche de longue durée à la table de jointure partagée.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
301
demandé sur Stuart Marks 2014-01-16 17:26:37

10 réponses

il y a en fait une astuce pour exécuter une opération parallèle dans un pool de bifurcation spécifique. Si vous l'exécutez comme une tâche dans un pool fork-join, il reste là et n'utilise pas le commun.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

L'astuce est basée sur ForkJoinTask.fork qui spécifie: "S'arrange pour exécuter cette tâche de manière asynchrone dans le pool La tâche courante est en cours d'exécution, si applicable, ou en utilisant le ForkJoinPool.commonPool () if not inForkJoinPool()"

304
répondu Lukas 2015-02-26 06:17:50

les flux parallèles utilisent la valeur par défaut ForkJoinPool.commonPool qui par défaut a un thread en moins que vous avez des processeurs , comme retourné par Runtime.getRuntime().availableProcessors() (cela signifie que les flux parallèles utilisent tous vos processeurs parce qu'ils utilisent aussi le thread principal):

pour les applications qui nécessitent des pools séparés ou personnalisés, un ForkJoinPool peut être construit avec un niveau de parallélisme cible donné; par défaut, égal au nombre de pools disponibles processeur.

cela signifie aussi que si vous avez des flux parallèles imbriqués ou plusieurs flux parallèles commencés simultanément, ils vont tous partager le même pool. Avantage: vous n'utiliserez jamais plus que la valeur par défaut (nombre de processeurs disponibles). Inconvénient: vous ne pouvez pas obtenir "tous les processeurs" attribué à chaque courant parallèle vous lancer (si vous avez plus d'un). (Apparemment vous pouvez utiliser un ManagedBlocker pour contourner cela.)

pour changer la façon dont les flux parallèles sont exécutés ,vous pouvez soit

  • soumettez l'exécution en flux parallèle à votre propre ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); ou
  • vous pouvez changer la taille du pool commun en utilisant les propriétés du système: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") pour un parallélisme cible de 20 threads.

exemple de la ce dernier sur ma machine qui dispose de 8 processeurs. Si je lance le programme suivant:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

la sortie est:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

donc vous pouvez voir que le flux parallèle traite 8 éléments à la fois, i.e. il utilise 8 threads. Cependant, si je décommente la ligne commentée, la sortie est:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

cette fois, le flux parallèle a utilisé 20 threads et les 20 éléments du flux ont été traités simultanément.

153
répondu assylias 2018-04-24 09:35:22

alternativement à l'astuce de déclencher le calcul parallèle à l'intérieur de votre propre forkJoinPool, vous pouvez également passer cette piscine à L'avenir completable.méthode supplyAsync comme dans:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
31
répondu Mario Fusco 2015-01-03 08:05:57

utiliser un ForkJoinPool et soumettre pour un flux parallèle n'utilise pas de manière fiable tous les threads. Si vous regardez ceci ( le flux parallèle d'un HashSet ne court pas en parallèle ) et ceci ( pourquoi le flux parallèle n'utilise-t-il pas tous les fils du ForkJoinPool? ), vous verrez le raisonnement.

version courte: si ForkJoinPool / submit ne fonctionne pas pour vous, utiliser

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
15
répondu Tod Casasent 2017-05-23 10:31:36

Jusqu'à présent, j'ai utilisé les solutions décrites dans les réponses de cette question. Maintenant, j'ai trouvé une petite bibliothèque appelée Parallel Stream Support pour cela:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

mais comme @PabloMatiasGomez l'a souligné dans les commentaires, Il ya des inconvénients en ce qui concerne le mécanisme de division des flux parallèles qui dépend fortement de la taille de la piscine commune. Voir flux parallèle à partir d'un HashSet ne fonctionne pas en parallèle .

j'utilise cette solution seulement pour avoir des piscines séparées pour différents types de travail mais je ne peux pas définir la taille de la piscine commune à 1 même si Je ne l'utilise pas.

7
répondu Stefan Ferstl 2017-05-23 10:31:36

pour mesurer le nombre réel de threads utilisés, Vous pouvez cocher Thread.activeCount() :

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

cela peut produire sur un CPU 4-core une sortie comme:

5 // common pool
23 // custom pool

sans .parallel() il donne:

3 // common pool
4 // custom pool
5
répondu charlie 2016-01-21 17:49:58

Aller pour obtenir de l' AbacusUtil . Le nombre de Thread peut être spécifié pour le flux parallèle. Voici le code échantillon:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Divulgation: je suis le développeur de AbacusUtil.

1
répondu user_3380739 2016-12-02 03:26:08

si cela ne vous dérange pas d'utiliser une bibliothèque tierce, avec cyclops-react vous pouvez mélanger des flux séquentiels et parallèles dans le même pipeline et fournir des poêles sur mesure. Par exemple

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

ou si nous voulions poursuivre le traitement dans un flux séquentiel

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[révélation je suis le développeur principal de cyclope-réagir]

0
répondu John McClean 2017-03-10 12:04:19

j'ai essayé le personnalisé ForkJoinPool comme suit pour ajuster la taille de la piscine:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Voici la sortie disant que le pool utilise plus de threads que la valeur par défaut 4 .

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

mais en fait il y a un weirdo , quand j'ai essayé d'atteindre le même résultat en utilisant ThreadPoolExecutor comme suit:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

mais j'ai échoué.

il démarrera seulement le parallelStream dans un nouveau fil et puis tout le reste est exactement le même, ce qui à nouveau prouve que le parallelStream utilisera le ForkJoinPool pour démarrer ses fils d'enfant.

0
répondu Hearen 2018-05-29 01:11:32

Note: Il semble y avoir une correction implémentée dans JDK 10 qui assure que le Pool de threads personnalisé utilise le nombre attendu de threads.

L'exécution en flux parallèle dans un ForkJoinPool personnalisé doit obéir au parallélisme https://bugs.openjdk.java.net/browse/JDK-8190974

0
répondu Scott Langley 2018-06-13 20:09:32