Traitement séquentiel et traitement parallèle

j'ai un producteur et de nombreux consommateurs.

  • le producteur est rapide et de générer beaucoup de résultats
  • les tokens avec la même valeur doivent être traités séquentiellement
  • les tokens avec des valeurs différentes doivent être traités en parallèle
  • créer de nouvelles exécutables serait très coûteux et aussi le code de production pourrait fonctionner avec 100k De jetons(pour créer un exécutable je dois passer au constructeur quelque complexe pour construire des objets)

puis-je obtenir les mêmes résultats avec un simple algorithme? Imiter un bloc de syncronisation avec une serrure de rentrée semble un peu contre nature. Y a-t-il des conditions de course que vous pourriez remarquer?

mise à jour: une deuxième solution que j'ai trouvée était de travailler avec 3 collections. Une pour mettre en cache les résultats du producteur, la seconde une file d'attente de blocage et la troisième en utilisant une liste pour suivre les tâches en cours. Encore une fois un peu compliqué.

ma version du code

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}
16
demandé sur danidacar 2016-01-08 22:31:03

10 réponses

vous pouvez pré-créer un ensemble de Runnables qui sélectionnera les tâches entrantes (tokens) et les placera dans des files d'attente en fonction de leur valeur de commande.

Comme l'a souligné dans les commentaires, c'est garantit que les tokens avec des valeurs différentes s'exécuteront toujours en parallèle (au total, vous êtes limité, au moins, par nr de noyaux physiques dans votre boite). Cependant, il est garanti que les jetons avec la même commande sera exécutée dans l'ordre d'arrivée.

Exemple de code:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}
6
répondu Victor Sorokin 2016-01-10 21:04:28

par la nature de votre code, le seul moyen de garantir que les jetons avec la même valeur sont traitées en série est d'attendre que STOP_TOKEN arrive.

vous aurez besoin d'une configuration de producteur unique-consommateur unique, avec le consommateur de collecte et de tri les jetons par leur valeur (dans le Multimap, disons).

ce N'est qu'alors que vous savez quels tokens peuvent être traités en série et lesquels peuvent être traités en parallèle.

en tout cas, je vous conseille de regarder perturbateur LMAX, ce qui est un moyen très efficace pour partager des données entre les threads.

il ne souffre pas de synchronisation aérienne en tant qu'exécuteur puisqu'il est libre de verrouillage (ce qui peut vous donner de belles performances, selon la nature de votre traitement de données).

la solution utilisant deux disrupteurs

// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());

// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));

inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));

// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));

inboundDisruptor.start();
outboundDisruptor.start();

// publisher code
for (int i = 0; i < 10; i++) {
    inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}

le gestionnaire d'événements sur le disrupteur entrant ne recueille que les jetons entrants. Lorsque le jeton STOP est reçu, il publie la série des jetons pour les sortants perturbateur pour la suite du traitement:

public class InEventHandler implements EventHandler<InEvent> {

    private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
    private Disruptor<OutEvent> outboundDisruptor;

    public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
        this.outboundDisruptor = outboundDisruptor;
    }

    @Override
    public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.token == STOP_TOKEN) {
            // publish indexed tokens to outbound disruptor for parallel processing
            tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
        } else {
            tokensByValue.put(event.token.value, event.token);
        }
    }
}

Sortante gestionnaire d'événement processus de jetons de la même valeur séquentielle:

public class OutEventHandler implements EventHandler<OutEvent> {

    private final long order;
    private final long allHandlersCount;
    private Object yourComplexDependency;

    public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
        this.order = order;
        this.allHandlersCount = allHandlersCount;
        this.yourComplexDependency = yourComplexDependency;
    }

    @Override
    public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % allHandlersCount != order ) {
            // round robin, do not consume every event to allow parallel processing
            return;
        }

        for (Token token : event.tokensToProcessSerially) {
            // do procesing of the token using your complex class
        }

    }
}

le reste de l'infrastructure requise (but décrit dans le Disruptor docs):

public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {

    public static final InEventTranslator INSTANCE = new InEventTranslator();

    @Override
    public void translateTo(InEvent event, long sequence, Token arg0) {
        event.token = arg0;
    }

}

public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {

    public static final OutEventTranslator INSTANCE = new OutEventTranslator();

    @Override
    public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
        event.tokensToProcessSerially = tokens;
    }
}


public class InEvent {

    // Note that no synchronization is used here,
    // even though the field is used among multiple threads.
    // Memory barrier used by Disruptor guarantee changes are visible.
    public Token token;
}

public class OutEvent {
    // ... again, no locks.
    public Collection<Token> tokensToProcessSerially;

}

public class Token {
    String value;

}
6
répondu David Siro 2016-01-12 21:27:36

si vous avez beaucoup de tokens différents, alors la solution la plus simple est de créer un certain nombre d'exécuteurs de thread simple (environ 2 fois votre nombre de noyaux), et puis distribuer chaque tâche à un exécuteur déterminé par le hachage de son token.

de cette façon toutes les tâches avec le même token iront au même exécuteur et s'exécuteront de façon séquentielle, parce que chaque exécuteur n'a qu'un seul thread.

si vous avez des exigences non énoncées au sujet de l'équité de l'horaire, alors c'est facile assez pour éviter tout déséquilibre significatif en ayant le fil du producteur mettre en file d'attente ses demandes (ou bloc) avant de les distribuer, jusqu'à ce qu'il y ait, disons, moins de 10 demandes par exécuteur en suspens.

5
répondu Matt Timmermans 2016-01-13 02:14:41

LA solution suivante n'utilisera qu'une seule carte qui est utilisée par le producteur et les consommateurs pour traiter les commandes dans l'ordre séquentiel pour chaque numéro de commande pendant le traitement des différents numéros de commande en parallèle. Voici le code:

public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

notez que la classe principale contient une instance de files d'attente qui est une Map. La carte clé est l'id de la commande que vous souhaitez traiter séquentiellement par les consommateurs. La valeur est une classe de Controller qui contiendra toutes les commandes associées à que l'id de la commande.

le producteur générera les commandes et ajoutera la commande, (Token), à son contrôleur associé. Les consommateurs vont itérer sur les valeurs de la table des files d'attente et appeler la méthode de verrouillage du contrôleur pour déterminer si elle peut traiter les commandes pour cet id d'ordre particulier. Si la serrure retourne false, elle vérifiera l'instance suivante du Controller. Si le verrou retourne true, il traitera toutes les commandes et vérifiera le contrôleur suivant.

mise à jour Ajout de l'entier sync qui est utilisé pour garantir que lorsqu'une instance du Controller est retirée de la carte des files d'attente. Toutes ses entrées seront consommés. Il y avait une erreur de logique dans le code de consommation où la méthode de déverrouillage a été appelée à bientôt.

la classe Token est similaire à celle que vous avez affichée ici.

class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

la classe Controller qui suit est utilisée pour s'assurer qu'un seul thread dans le pool de threads traitera les commandes. Le verrouiller/déverrouiller les méthodes sont utilisées pour déterminer les threads seront autorisés à traiter les commandes.

class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

informations supplémentaires sur la mise en œuvre. Il utilise un CountDownLatch pour s'assurer que toutes les commandes produites seront traitées avant la fin du processus. La variable done est exactement comme votre variable STOP_TOKEN.

la mise en oeuvre contient un problème que vous devez résoudre. Il y a la question qu'il ne purge pas le contrôleur pour un numéro de commande lorsque toutes les commandes ont été traitées. Cela provoquera des cas où un thread dans le pool de threads sera assigné à un controller qui ne contient pas d'ordres. Ce qui gaspillera les cycles cpu qui pourraient être utilisés pour effectuer d'autres tâches.

4
répondu Claudio Corsi 2016-01-14 04:04:03

est-ce que tout ce dont vous avez besoin est de s'assurer que les jetons avec la même valeur ne sont pas traités simultanément? Votre code est trop confus pour comprendre ce que vous voulez dire (il ne compile pas, et a beaucoup de variables inutilisées, serrures et cartes, qui sont créées mais jamais utilisées). On dirait que tu penses beaucoup trop à ça. Il vous suffit d'une file d'attente et d'une carte. Quelque chose comme ça j'imagine:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }
4
répondu Dima 2016-01-16 20:48:57

les tokens avec la même valeur doivent être traités séquentiellement

la façon de s'assurer que deux choses se produisent dans l'ordre est de les faire dans le même thread.

j'aurais une collection de cependant beaucoup de fils ouvriers, et j'aurais une carte. Chaque fois que j'obtiens un token que je n'ai jamais vu auparavant, je choisis un thread au hasard, et j'entre le token et le thread dans la carte. A partir de là, j'utiliserai le même thread pour exécuter les tâches associées à cette jeton.

création de nouveaux Runnables serait très cher

Runnable est une interface. La création de nouveaux objets œuvreRunnable ne va pas être beaucoup plus cher que la création de tout autre type d'objet.

3
répondu Solomon Slow 2016-01-08 20:01:41

peut-être que j'ai mal compris quelque chose. Mais il semble qu'il serait plus facile de filtrer les Tokens avec la même valeur de ceux avec des valeurs différentes dans deux files d'attente différentes initialement.

et ensuite utiliser Stream avec une carte ou un foreach pour le séquentiel. Et utilisez simplement la version à flux parallèle pour le reste.

si vos jetons dans l'environnement de production sont générés par paresseusement et que vous n'en Recevez qu'un à la fois, vous faites simplement une sorte de filtre qui distribue pour les deux différentes files d'attente.

si vous pouvez l'implémenter avec des Streams I suqqest faisant cela car ils sont simples, faciles à utiliser et rapides!

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

j'ai donné un bref exemple de ce que je veux dire. Dans ce cas, les jetons numériques sont en quelque sorte fabriqués artificiellement, mais c'est à côté du point. Aussi les flux sont tous les deux initiés sur le fil principal qui ne serait probablement pas non plus idéal.

public static void main(String args[]) {
    ArrayList<Token> sameValues = new ArrayList<Token>();
    ArrayList<Token> distinctValues = new ArrayList<Token>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int next = random.nextInt(100);
        Token n = new Token(i, String.valueOf(next));
        if (next == i) {
            sameValues.add(n);
        } else {
            distinctValues.add(n);
        }
    }       
    distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
    sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));       
}
3
répondu PNS 2016-01-12 23:44:51

Je ne suis pas tout à fait sûr d'avoir compris la question, mais je vais essayer un algorithme.

Les acteurs sont:

  • queue de tâches
  • pool libre executors
  • setin-process jetons actuellement en cours de traitement
  • controller

Ensuite,

  • initialement tous executors et le set est vide

  • controller sélectionne un disponible executor et passe par l' queuetask avec un jeton qui n'est pas dans le in-process set et quand il le trouve

    • ajoute le jeton à l' in-process set
    • assigne l' executor processus task et
    • remonte au début de la file d'attente
  • executor supprime le jeton de l' set quand c'est fait traitement et s'ajoute de nouveau à la piscine

3
répondu Miserable Variable 2016-01-13 00:50:45

une façon de faire ceci est d'avoir un exécuteur pour le traitement séquentiel et un pour le traitement parallèle. Nous avons également besoin d'un seul service de gestionnaire fileté qui décidera à quel service token doit être soumis pour le traitement. // Queue à partager par les deux threads. Contient les jetons produits par le producteur.

BlockingQueue tokenList = new ArrayBlockingQueue (10);

    private void startProcess() {
    ExecutorService producer = Executors.newSingleThreadExecutor();
    final ExecutorService consumerForSequence = Executors
            .newSingleThreadExecutor();
    final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
    ExecutorService manager = Executors.newSingleThreadExecutor();

    producer.submit(new Producer(tokenList));

    manager.submit(new Runnable() {

        public void run() {
            try {
                while (true) {
                    Token t = tokenList.take();
                    System.out.println("consumed- " + t.orderid
                            + " element");

                    if (t.orderid % 7 == 0) { // any condition to check for sequence processing

                        consumerForSequence.submit(new ConsumerForSequenceProcess(t));

                    } else {

                        ConsumerForParallel.submit(new ConsumerForParallelProcess(t));

                    }
                }
            }

            catch (InterruptedException e) { // TODO Auto-generated catch
                // block
                e.printStackTrace();
            }

        }
    });
}
3
répondu Nitin Saxena 2016-01-16 10:53:56

je pense qu'il y a un problème de conception plus fondamental caché derrière cette tâche, mais ok. Je ne peux pas comprendre à partir de votre Description du problème si vous voulez l'exécution en ordre ou si vous voulez juste des opérations sur des tâches décrites par des jetons simples pour être atomique/transactionnel. Ce que je propose ci-dessous ressemble plus à une "solution rapide" à cette question qu'à une solution réelle.

pour le cas réel de "l'exécution ordonnée" je propose une solution qui est basée sur les proxies de file d'attente qui ordonnent le sortie:

  1. définir une implémentation de file d'attente qui fournit une méthode d'usine générant des files d'attente de proxy qui sont représentées du côté du producteur par un seul objet de file d'attente; la méthode d'usine devrait également enregistrer ces objets de file d'attente de proxy. l'ajout d'un élément à la file d'attente d'entrée doit l'ajouter directement à l'une des files d'attente de sortie si elle correspond à un des éléments dans l'une des files d'attente de sortie. Sinon, ajoutez-le à n'importe quelle file d'attente (la plus courte). (mettre en œuvre le vérifier pour cela efficacement). Alternativement (légèrement mieux): ne faites pas cela lorsque l'élément est ajouté, mais lorsque l'une des files d'attente de sortie est vide.

  2. donnez à chacun de vos consommateurs exécutables un champ stockant une interface de file d'attente individuelle (au lieu d'accéder à un seul objet). Initialiser ce champ par la méthode définie ci-dessus.

pour le cas de transaction je pense qu'il est plus facile de passer plus de threads que vous avez des noyaux (utilisez les statistiques pour calculer ceci), et mettre en œuvre le mécanisme de blocage à un niveau (objet) inférieur.

2
répondu Alexander Kemp 2016-01-17 16:13:31