awaitable Tâche de la file d'attente

je me demande s'il existe une implémentation/wrapper pour ConcurrentQueue, similaire à BlockingCollection où prendre de la collection ne bloque pas, mais est plutôt asynchrone et causera un async attendre jusqu'à ce qu'un article soit placé dans la file d'attente.

j'ai créé ma propre implémentation, mais elle ne semble pas fonctionner comme prévu. Je me demande si je réinvente quelque chose qui existe déjà.

Voici mon application:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
27
demandé sur Stephen Cleary 2011-10-23 04:36:40

5 réponses

je ne sais pas de lock-solution gratuite, mais vous pouvez prendre un coup d'oeil à la nouvelle bibliothèque de Flux de données, une partie de l' Async CTP. A simple BufferBlock<T> devrait suffire, par exemple:

BufferBlock<int> buffer = new BufferBlock<int>();

la Production et la consommation se font le plus facilement par des méthodes d'extension sur les types de blocs dataflow.

la Production est aussi simple que:

buffer.Post(13);

et la consommation est asynchrone de prêt:

int item = await buffer.ReceiveAsync();

je vous recommande D'utiliser Dataflow si possible; rendre un tel tampon à la fois efficace et correct est plus difficile qu'il n'y paraît.

42
répondu Stephen Cleary 2015-06-03 09:30:16

Mon atempt (il a un évènement soulevée lors de la "promesse" est créé, et il peut être utilisé par un producteur externe pour savoir quand produire plus d'articles):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
2
répondu André Bires 2014-04-08 13:38:15

C'est peut-être exagéré pour votre cas d'utilisation (compte tenu de la courbe d'apprentissage), mais Extentions Réactives fournit la colle que vous pourriez jamais vouloir pour asynchrones composition.

Vous abonner à des changements, et ils sont poussés à vous qu'ils sont disponibles, et vous pouvez avoir le système de pousser les modifications sur un thread séparé.

1
répondu Morten Mertner 2011-10-23 00:48:09

Voici l'implémentation que j'utilise actuellement.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

Cela fonctionne assez bon, mais il y a beaucoup de controverse sur queueSyncLock, comme je fais beaucoup de l'utilisation de l' CancellationToken pour annuler certaines des tâches en attente. Bien sûr, cela conduit à beaucoup moins de blocage je verrais avec un BlockingCollection mais...

je me demande s'il n'y a pas un moyen plus doux, plus libre d'atteindre le même but

0
répondu spender 2011-10-23 14:05:40

Vous pouvez simplement utiliser un BlockingCollection ( en utilisant la valeur par défaut ConcurrentQueue ) et l'enveloppe de l'appel à Take dans un Task de sorte que vous pouvez await:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
-1
répondu Nicholas Butler 2011-10-23 09:56:22