C # producteur / consommateur

j'ai récemment rencontré un producteur/consommateur modèle c# implémentation. c'est très simple et (pour moi au moins) très élégant.

il semble qu'il ait été conçu VERS 2006, donc je me demandais si cette implémentation était

- sûr

- toujours applicable

le Code est ci-dessous (le code original a été référencé à http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("ttttConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}
22
demandé sur razlebe 2009-11-01 07:36:40

4 réponses

le code est plus ancien que ça - je l'ai écrit quelque temps avant. concept d'une file d'attente producteur/consommateur est plus ancien que cela cependant :)

Oui, ce code est sûr pour autant que je suis au courant - mais il a quelques lacunes:

  • c'est non-générique. Une version moderne serait certainement générique.
  • Il n'a aucun moyen d'arrêter la file d'attente. Une façon simple d'arrêter la file d'attente (de sorte que tous les fils de consommation prendre sa retraite) est d'avoir un jeton" arrêt de travail " qui peut être mis dans la file d'attente. Vous ajoutez alors autant de jetons que vous avez des fils. Sinon, vous avez un drapeau pour indiquer que vous souhaitez arrêter. (Cela permet aux autres threads de s'arrêter avant de terminer tout le travail en cours dans la file d'attente.)
  • si les travaux sont très petits, la consommation d'un seul travail à la fois peut ne pas être la chose la plus efficace à faire.

les idées derrière le code sont plus importantes que le code lui-même, pour être honnête.

29
répondu Jon Skeet 2009-11-01 08:04:33

Vous pourriez faire quelque chose comme l'extrait de code suivant. Il est générique et a une méthode pour enqueue-ing nulls (ou n'importe quel drapeau que vous voulez utiliser) pour dire aux threads de travail de sortir.

Le code est pris à partir d'ici: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}
24
répondu dashton 2009-11-01 10:37:11

le jour où j'ai appris comment surveiller.Wait / Pulse fonctionne (et beaucoup sur les threads en général) à partir du morceau de code ci-dessus et le série d'articles il est de. Donc, comme Jon le dit, il a beaucoup de valeur pour lui et est en effet sûr et applicable.

cependant, à partir de .NET 4, il y a un mise en place de la file d'attente producteur-consommateur dans le cadre. Je viens juste de le trouver moi-même mais jusqu'à présent il fait tout ce dont j'ai besoin.

14
répondu kicsit 2012-05-02 14:10:43

Avertissement: Si vous lisez les commentaires, vous comprendrez ma réponse est fausse :)

Il y a un possible blocage dans votre code.

imaginez le cas suivant, pour plus de clarté, j'ai utilisé une approche à fil simple mais devrait être facile à convertir en fil multiple avec le sommeil:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

s'il vous Plaît laissez-moi savoir si cela n'a aucun sens.

Si c'est confirmé, alors la réponse à votre question est "non, il n'est pas sûr" ;) J'espère que cette aide.

1
répondu DiogoNeves 2013-03-22 17:47:50