File d'attente de taille fixe qui désqueue automatiquement les anciennes valeurs sur les nouvelles enques
J'utilise ConcurrentQueue
pour une structure de données partagée dont le but est de contenir les N derniers objets qui lui sont passés (genre d'historique).
Supposons que nous ayons un navigateur et que nous voulons avoir les 100 dernières URL parcourues. Je veux une file d'attente qui supprime automatiquement (dequeue) la plus ancienne (première) entrée lors de la nouvelle insertion d'entrée (file d'attente) lorsque la capacité est pleine (100 adresses dans l'histoire).
Comment puis-je accomplir cela en utilisant System.Collections
?
10 réponses
J'écrirais une classe wrapper qui, lors de la mise en file d'attente, vérifierait le nombre, puis la désactiverait lorsque le nombre dépasse la limite.
public class FixedSizedQueue<T>
{
ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();
public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}
J'irais pour une légère variante... étendez ConcurrentQueue afin de pouvoir utiliser les extensions Linq sur FixedSizeQueue
public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
private readonly object syncObject = new object();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public new void Enqueue(T obj)
{
base.Enqueue(obj);
lock (syncObject)
{
while (base.Count > Size)
{
T outObj;
base.TryDequeue(out outObj);
}
}
}
}
Pour tous ceux qui le trouvent utile, voici un code de travail basé sur la réponse de Richard Schneider ci-dessus:
public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
queue.Enqueue(obj);
while (queue.Count > Size)
{
T outObj;
queue.TryDequeue(out outObj);
}
}
}
Pour ce que cela vaut, voici un tampon circulaire léger avec certaines méthodes marquées pour une utilisation sûre et dangereuse.
public class CircularBuffer<T> : IEnumerable<T>
{
readonly int size;
readonly object locker;
int count;
int head;
int rear;
T[] values;
public CircularBuffer(int max)
{
this.size = max;
locker = new object();
count = 0;
head = 0;
rear = 0;
values = new T[size];
}
static int Incr(int index, int size)
{
return (index + 1) % size;
}
private void UnsafeEnsureQueueNotEmpty()
{
if (count == 0)
throw new Exception("Empty queue");
}
public int Size { get { return size; } }
public object SyncRoot { get { return locker; } }
#region Count
public int Count { get { return UnsafeCount; } }
public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
public int UnsafeCount { get { return count; } }
#endregion
#region Enqueue
public void Enqueue(T obj)
{
UnsafeEnqueue(obj);
}
public void SafeEnqueue(T obj)
{
lock (locker) { UnsafeEnqueue(obj); }
}
public void UnsafeEnqueue(T obj)
{
values[rear] = obj;
if (Count == Size)
head = Incr(head, Size);
rear = Incr(rear, Size);
count = Math.Min(count + 1, Size);
}
#endregion
#region Dequeue
public T Dequeue()
{
return UnsafeDequeue();
}
public T SafeDequeue()
{
lock (locker) { return UnsafeDequeue(); }
}
public T UnsafeDequeue()
{
UnsafeEnsureQueueNotEmpty();
T res = values[head];
values[head] = default(T);
head = Incr(head, Size);
count--;
return res;
}
#endregion
#region Peek
public T Peek()
{
return UnsafePeek();
}
public T SafePeek()
{
lock (locker) { return UnsafePeek(); }
}
public T UnsafePeek()
{
UnsafeEnsureQueueNotEmpty();
return values[head];
}
#endregion
#region GetEnumerator
public IEnumerator<T> GetEnumerator()
{
return UnsafeGetEnumerator();
}
public IEnumerator<T> SafeGetEnumerator()
{
lock (locker)
{
List<T> res = new List<T>(count);
var enumerator = UnsafeGetEnumerator();
while (enumerator.MoveNext())
res.Add(enumerator.Current);
return res.GetEnumerator();
}
}
public IEnumerator<T> UnsafeGetEnumerator()
{
int index = head;
for (int i = 0; i < count; i++)
{
yield return values[index];
index = Incr(index, size);
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
#endregion
}
J'aime utiliser la convention Foo()/SafeFoo()/UnsafeFoo()
:
-
Les méthodes
Foo
appellentUnsafeFoo
par défaut. -
UnsafeFoo
les méthodes modifient l'état librement sans verrou, elles ne doivent appeler que d'autres méthodes dangereuses. -
SafeFoo
méthodes appelezUnsafeFoo
méthodes à l'intérieur d'un verrou.
C'est un peu verbeux, mais il fait des erreurs évidentes, comme appeler des méthodes dangereuses en dehors d'un verrou dans une méthode qui est censée être thread-safe, plus apparente.
Juste pour le plaisir, voici une autre implémentation qui, je crois, répond à la plupart des préoccupations des commentateurs. En particulier, la sécurité des threads est obtenue sans verrouillage et l'implémentation est masquée par la classe d'encapsulation.
public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private int _count;
public int Limit { get; private set; }
public FixedSizeQueue(int limit)
{
this.Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
Interlocked.Increment(ref _count);
// Calculate the number of items to be removed by this thread in a thread safe manner
int currentCount;
int finalCount;
do
{
currentCount = _count;
finalCount = Math.Min(currentCount, this.Limit);
} while (currentCount !=
Interlocked.CompareExchange(ref _count, finalCount, currentCount));
T overflow;
while (currentCount > finalCount && _queue.TryDequeue(out overflow))
currentCount--;
}
public int Count
{
get { return _count; }
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return _queue.GetEnumerator();
}
}
MA version est juste une sous-classe de Queue
normaux.. rien de spécial mais en voyant tout le monde Participer et cela va toujours avec le titre du sujet, je pourrais aussi bien le mettre ici. Il renvoie également ceux dequeued juste au cas où.
public sealed class SizedQueue<T> : Queue<T>
{
public int FixedCapacity { get; }
public SizedQueue(int fixedCapacity)
{
this.FixedCapacity = fixedCapacity;
}
/// <summary>
/// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
/// </summary>
/// <returns>The dequeued value, if any.</returns>
public new T Enqueue(T item)
{
base.Enqueue(item);
if (base.Count > FixedCapacity)
{
return base.Dequeue();
}
return default;
}
}
Eh bien, cela dépend de l'utilisation que j'ai remarqué que certaines des solutions ci-dessus peuvent dépasser la taille lorsqu'elles sont utilisées dans un environnement multi-thread. Quoi qu'il en soit, mon cas d'utilisation était d'afficher les 5 derniers événements et il y a plusieurs threads qui écrivent des événements dans la file d'attente et un autre thread qui en lit et l'affiche dans un contrôle Winform. Donc c'était ma solution.
EDIT: puisque nous utilisons déjà le verrouillage dans notre implémentation, nous n'avons pas vraiment besoin de ConcurrentQueue, cela peut améliorer les performances.
class FixedSizedConcurrentQueue<T>
{
readonly Queue<T> queue = new Queue<T>();
readonly object syncObject = new object();
public int MaxSize { get; private set; }
public FixedSizedConcurrentQueue(int maxSize)
{
MaxSize = maxSize;
}
public void Enqueue(T obj)
{
lock (syncObject)
{
queue.Enqueue(obj);
while (queue.Count > MaxSize)
{
queue.Dequeue();
}
}
}
public T[] ToArray()
{
T[] result = null;
lock (syncObject)
{
result = queue.ToArray();
}
return result;
}
public void Clear()
{
lock (syncObject)
{
queue.Clear();
}
}
}
EDIT: nous n'avons pas vraiment besoin de syncObject
dans l'exemple ci-dessus et nous pouvons plutôt utiliser queue
object puisque nous ne réinitialisons pas queue
dans n'importe quelle fonction et son marqué comme readonly
de toute façon.
Pour votre plaisir de codage, je vous soumets le 'ConcurrentDeck
'
public class ConcurrentDeck<T>
{
private readonly int _size;
private readonly T[] _buffer;
private int _position = 0;
public ConcurrentDeck(int size)
{
_size = size;
_buffer = new T[size];
}
public void Push(T item)
{
lock (this)
{
_buffer[_position] = item;
_position++;
if (_position == _size) _position = 0;
}
}
public T[] ReadDeck()
{
lock (this)
{
return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
}
}
}
Exemple D'Utilisation:
void Main()
{
var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
var handle = new ManualResetEventSlim();
var task1 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task2 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task3 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
handle.Set();
var outputtime = DateTime.Now;
deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Ajoutons une réponse de plus. Pourquoi cela sur les autres?
1) simplicité. Essayer de garantir la taille est bien et bien, mais conduit à une complexité inutile qui peut présenter ses propres problèmes.
2) implémente IReadOnlyCollection, ce qui signifie que vous pouvez utiliser Linq dessus et le passer dans une variété de choses qui attendent IEnumerable.
3) Pas de verrouillage. Beaucoup de solutions ci-dessus utilisent des verrous, ce qui est incorrect sur une collection sans verrouillage.
4) implémente le même ensemble de méthodes, propriétés et interfaces ConcurrentQueue, y compris IProducerConsumerCollection, ce qui est important si vous souhaitez utiliser la collection avec BlockingCollection.
Cette implémentation pourrait potentiellement se retrouver avec plus d'entrées que prévu si TryDequeue échoue, mais la fréquence de cela ne semble pas valoir un code spécialisé qui entravera inévitablement les performances et causera ses propres problèmes inattendus.
Si vous voulez absolument garantir une taille, implémentez un Prune() ou une méthode similaire semble être la meilleure idée. Vous pouvez utiliser un verrou de lecture ReaderWriterLockSlim dans les autres méthodes (y compris TryDequeue) et prendre un verrou d'écriture uniquement lors de l'élagage.
class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
readonly ConcurrentQueue<T> m_concurrentQueue;
readonly int m_maxSize;
public int Count => m_concurrentQueue.Count;
public bool IsEmpty => m_concurrentQueue.IsEmpty;
public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }
public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
if (initialCollection == null) {
throw new ArgumentNullException(nameof(initialCollection));
}
m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
m_maxSize = maxSize;
}
public void Enqueue (T item) {
m_concurrentQueue.Enqueue(item);
if (m_concurrentQueue.Count > m_maxSize) {
T result;
m_concurrentQueue.TryDequeue(out result);
}
}
public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);
public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
public T[] ToArray () => m_concurrentQueue.ToArray();
public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();
// Explicit ICollection implementations.
void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;
// Explicit IProducerConsumerCollection<T> implementations.
bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);
public override int GetHashCode () => m_concurrentQueue.GetHashCode();
public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
public override string ToString () => m_concurrentQueue.ToString();
}
C'est ma version de la file d'attente:
public class FixedSizedQueue<T> {
private object LOCK = new object();
ConcurrentQueue<T> queue;
public int MaxSize { get; set; }
public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
this.MaxSize = maxSize;
if (items == null) {
queue = new ConcurrentQueue<T>();
}
else {
queue = new ConcurrentQueue<T>(items);
EnsureLimitConstraint();
}
}
public void Enqueue(T obj) {
queue.Enqueue(obj);
EnsureLimitConstraint();
}
private void EnsureLimitConstraint() {
if (queue.Count > MaxSize) {
lock (LOCK) {
T overflow;
while (queue.Count > MaxSize) {
queue.TryDequeue(out overflow);
}
}
}
}
/// <summary>
/// returns the current snapshot of the queue
/// </summary>
/// <returns></returns>
public T[] GetSnapshot() {
return queue.ToArray();
}
}
Je trouve utile d'avoir un constructeur construit sur un IEnumerable et je trouve utile d'avoir un GetSnapshot pour avoir une liste sécurisée multithread (tableau dans ce cas) des éléments au moment de l'appel, qui ne génère pas d'erreurs si la collection sous-jacente change.
La double vérification du compte est d'empêcher la serrure dans certaines circonstances.