La boucle parallèle Java 8 imbriquée pour chaque tour donne de mauvais résultats. Est-ce le comportement attendu?

Note: j'ai déjà abordé ce problème dans un autre SO post - en utilisant un sémaphore à l'intérieur D'un flux parallèle d'action Java 8 imbriqué peut bloquer. Est-ce un bug? -, mais le titre de ce post suggère que le problème est lié à l'utilisation d'un sémaphore - qui quelque peu distrait la discussion. Je crée celui - ci pour souligner que les boucles imbriquées pourraient avoir un problème de performance-bien que les deux problèmes ont probablement une cause commune (et peut-être parce que il m'a fallu beaucoup de temps pour comprendre ce problème). (Je ne le vois pas comme un double, parce qu'il est stressant un autre symptôme - mais si vous le faites tout simplement supprimer).

problème: si vous nichez deux Java 8 stream.parallèle.)(chaque boucle et toutes les tâches sont indépendantes, apatrides, etc. - excepté pour être soumis à la piscine commune de FJ -, alors l'imbrication d'une boucle parallèle à l'intérieur d'une boucle parallèle effectue beaucoup plus pauvre que l'imbrication d'une boucle séquentielle à l'intérieur d'une boucle parallèle. Pire encore: si l'opération contenant la boucle interne est synchronisée, vous vous retrouverez dans une impasse.

la Démonstration de la performance de l'émission

sans le "synchronized" vous pouvez encore observer un problème de performance. Vous trouverez un code de démo pour cela à: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (voir le JavaDoc pour une description plus détaillée).

notre configuration ici est la suivante: nous avons un flux imbriqué.parallèle.)(forEach ().

  • la boucle intérieure est indépendante (apatride, pas d'interférence, etc. - excepté l'utilisation d'un pool commun) et consomme 1 seconde au total dans le pire des cas, à savoir si traitée séquentielle.
  • la moitié des tâches de la boucle extérieure consomme 10 secondes avant que boucle.
  • la moitié consomme 10 secondes après cette boucle.
  • donc chaque fil consomme au total 11 secondes (dans le pire des cas).  * Nous avons un booléen qui permet de passer à l'intérieur de la boucle parallèle() pour séquentielle().

maintenant: soumettre 24 tâches en boucle externe à un pool avec parallélisme 8 nous nous attendons à 24/8 * 11 = 33 secondes au mieux (sur une machine 8 core ou mieux).

le résultat est:

  • avec boucle séquentielle intérieure: 33 secondes.
  • avec boucle parallèle interne: > 80 secondes (j'avais 92 secondes).

Question: pouvez-vous confirmer ce comportement? Est-ce quelque chose que l'on attendrait du cadre? (Je suis un peu plus prudent maintenant avec une affirmation que c'est un bug, mais je crois personnellement que c'est dû à un bug dans L'implémentation de ForkJoinTask. Remarque: Je ont posté ceci à concurrence-intérêt (voir http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), mais jusqu'à présent je n'ai pas obtenu confirmation de là).

démonstration de l'impasse

le code suivant sera dans L'impasse

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

numberOfTasksInOuterLoop = 24 , numberOfTasksInInnerLoop = 240 , outerLoopOverheadFactor = 10000 et doWork est un brûleur CPU apatride.

vous trouvez un code de démonstration complet à http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (voir le JavaDoc pour une description plus détaillée).

ce comportement est-il attendu? Notez que la documentation sur Java parallel streams ne mentionne aucun problème de nidification ou de synchronisation. Aussi, le fait d'utiliser un commun fork-join-la piscine n'est pas mentionné.

mise à Jour

un autre essai sur la question de la performance peut être trouvé à http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - ce test vient sans aucune opération de blocage (Pas de filetage.sommeil et non synchronisée). J'ai compilé quelques remarques supplémentaires ici: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Update 2

il semble que ce problème et l'impasse plus sévère avec les sémaphores aient été corrigés dans Java8 u40.

17
demandé sur Community 2014-05-06 12:47:40

3 réponses

le problème est que le parallélisme plutôt limité que vous avez configuré est rongé par le traitement de flux externe: si vous dites que vous voulez Huit threads et traiter un flux de plus de huit articles avec parallel() il créera Huit threads d'ouvrier et les laissera traiter des articles.

alors au sein de votre consommateur, vous traitez un autre flux en utilisant parallel() , mais il n'y a plus de threads ouvriers. Puisque les threads ouvriers sont bloqués en attendant la fin du traitement du flux interne, le ForkJoinPool doit créer de nouveaux threads de travail qui violent votre parallélisme configuré. Il me semble qu'il ne recycle pas ces threads extend mais les laisse mourir juste après le traitement. Ainsi, dans votre processus interne, de nouveaux threads sont créés et éliminés, ce qui est une opération coûteuse.

vous pourriez le voir comme un défaut que les threads initiateurs ne contribuent pas au calcul d'un traitement de flux parallèle mais juste attendez le résultat mais même si cela a été corrigé vous avez encore un problème général qui est difficile (si jamais) à corriger:

chaque fois que le rapport entre le nombre de threads ouvriers et les éléments du flux externe est faible, l'implémentation les utilisera tous pour le flux externe car il ne sait pas que le flux est un flux externe. Ainsi, exécuter un flux interne en parallèle demande plus de threads worker que disponible. En utilisant le fil appelant pour contribuer au calcul pourrait le corriger une façon que la performance égale le calcul en série mais obtenir un avantage de l'exécution parallèle ici ne fonctionne pas bien avec le concept d'un nombre fixe de threads de travail.

notez que vous grattez à la surface de ce problème ici, car vous avez des temps de traitement assez équilibrés pour les articles. Si le traitement des articles intérieurs et extérieurs diffère (par rapport aux articles du même niveau), le problème sera encore pire.


mise à Jour: par le profilage et en regardant le code, il semble que le ForkJoinPool ne les tentatives d'utiliser le thread en attente pour le "travail de voler", mais à l'aide d'un code différent en fonction sur le fait de savoir si le Thread est un thread de travail ou à un autre thread. En conséquence, un thread ouvrier attend environ 80% du temps et ne fait que très peu ou pas de travail alors que d'autres threads contribuent vraiment au calcul...


mise à jour 2: pour plus d'exhaustivité, voici la méthode simple d'exécution en parallèle décrite dans les commentaires. Comme il interroge chaque article, il est prévu d'avoir beaucoup de frais généraux lorsque le temps d'exécution pour un seul article est assez faible. Il ne s'agit donc pas d'une solution sophistiquée, mais plutôt d'une démonstration qu'il est possible de gérer des tâches de longue durée sans trop de magie...

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}
5
répondu Holger 2014-06-04 18:35:23

après avoir un peu rangé le code. Je ne vois pas les mêmes résultats avec Java 8 update 45. Il y a sans aucun doute un plafond, mais il est très petit par rapport à l'intervalle de temps dont vous parlez.

le potentiel d'une impasse est attendu que vous consommez tous les threads disponibles dans la piscine avec la boucle extérieure, vous laissant aucun threads à gauche pour exécuter la boucle intérieure.

le programme suivant imprime

isInnerStreamParallel: false, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.1 seconds.
isInnerStreamParallel: false, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.0 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.5 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.6 seconds.

le code

import java.util.stream.IntStream;

public class NestedParallelForEachTest {
    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100;                // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams    = 8;    // java.util.concurrent.ForkJoinPool.common.parallelism

    public static void main(String[] args) {
        testNestedLoops(false, false);
        testNestedLoops(false, true);
        testNestedLoops(true, false);
        testNestedLoops(true, true);
    }

    public static void testNestedLoops(boolean isInnerStreamParallel, boolean isCPUTimeBurned) {
        System.out.println("isInnerStreamParallel: " + isInnerStreamParallel + ", isCPUTimeBurned: " + isCPUTimeBurned);
        long start = System.nanoTime();

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        // Outer loop
        IntStream.range(0, numberOfTasksInOuterLoop).parallel().forEach(i -> {
//            System.out.println(i + "\t" + Thread.currentThread());
            if(i < 10) burnTime(10 * 1000, isCPUTimeBurned);

            IntStream range = IntStream.range(0, numberOfTasksInInnerLoop);
            if (isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                range = range.parallel();
            } else {
                // Inner loop as sequential
            }
            range.forEach(j -> burnTime(10, isCPUTimeBurned));

            if(i >= 10) burnTime(10 * 1000, isCPUTimeBurned);
        });

        long end = System.nanoTime();

        System.out.printf("Done in %.1f seconds.%n", (end - start) / 1e9);
    }

    static void burnTime(long millis, boolean isCPUTimeBurned) {
        if (isCPUTimeBurned) {
            long end = System.nanoTime() + millis * 1000000;
            while (System.nanoTime() < end)
                ;

        } else {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new AssertionError(e);
            }
        }
    }
}
1
répondu Peter Lawrey 2015-07-06 08:26:10

je peux confirmer qu'il s'agit toujours d'un problème de rendement dans 8u72, bien qu'il ne sera plus dans l'impasse. Les opérations de terminaux parallèles sont toujours effectuées avec ForkJoinTask instances en dehors d'un forkjoinpool contexte, ce qui signifie que chaque flux parallèle partage toujours le pool commun .

pour démontrer un cas pathologique simple:

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class ParallelPerf {

    private static final Object LOCK = new Object();

    private static void runInNewPool(Runnable task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static <T> T runInNewPool(Callable<T> task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            return pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static void innerLoop() {
        IntStream.range(0, 32).parallel().forEach(i -> {
//          System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public static void main(String[] args) {
        System.out.println("==DEFAULT==");
        long startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                innerLoop();
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);

        System.out.println("==NEW POOLS==");
        startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                runInNewPool(() -> innerLoop());
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);
    }
}

La seconde passe innerLoop à runInNewPool au lieu de l'appeler directement. Sur ma machine (i7-4790, 8 threads CPU), j'obtiens une vitesse d'environ 4x:

==DEFAULT==
4321223964
==NEW POOLS==
1015314802

décommenter les autres déclarations d'impression rend le problème évident:

[...]
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
 outer: ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
[...]
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
 outer: ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-4
[...]

les fils d'ouvrier de piscine communs s'empilent au bloc synchronisé, avec seulement un fil capable d'entrer à la fois. Depuis l'opération parallèle interne utilise la même piscine, et tous les autres fils dans la piscine sont en attente pour la serrure, on obtient une exécution simple-filetée.

et le résultat de l'utilisation des instances séparées de ForkJoinPool:

[...]
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-6
ForkJoinPool-1-worker-5
 outer: ForkJoinPool.commonPool-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-5
[...]
ForkJoinPool-2-worker-7
ForkJoinPool-2-worker-3
 outer: ForkJoinPool.commonPool-worker-1
ForkJoinPool-3-worker-2
ForkJoinPool-3-worker-5
[...]

nous avons encore la boucle interne tournant sur un fil ouvrier à la fois, mais l'opération parallèle interne obtient un nouveau pool à chaque fois et peut utiliser tous ses fils ouvriers.

il s'agit d'un exemple artificiel, mais la suppression des blocs synchronisés montre toujours une différence similaire de vitesse, puisque le les boucles intérieure et extérieure sont toujours en compétition sur les mêmes fils ouvrants. Les applications Multithreaded doivent être prudentes lors de l'utilisation de flux parallèles dans plusieurs threads, car cela pourrait entraîner un ralentissement aléatoire quand ils se chevauchent.

il s'agit d'un problème pour toutes les opérations de terminal, pas seulement forEach , puisqu'elles exécutent toutes des tâches dans le bassin commun. J'utilise les méthodes runInNewPool ci-dessus comme solution de contournement, mais j'espère que cela sera intégré dans la bibliothèque standard à certains point.

1
répondu Sean Van Gorder 2016-05-13 17:21:32