Создание очереди блокировки<T> в.NET?
У меня есть сценарий, когда у меня есть несколько потоков, добавляющих в очередь, и несколько потоков, читающих из одной и той же очереди. Если очередь достигает определенного размера, все потоки, заполняющие очередь, будут заблокированы при добавлении, пока элемент не будет удален из очереди.
Решение ниже - то, что я использую прямо сейчас, и мой вопрос: как это можно улучшить? Есть ли объект, который уже включает это поведение в BCL, который я должен использовать?
internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE
private AutoResetEvent _FullEvent = new AutoResetEvent(false);
internal T this[int i]
{
get { return (T) List[i]; }
}
private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}
internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}
internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.WaitOne();
List.Add(item);
Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));
checkSize();
}
internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}
Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}
protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}
internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}
private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
9 ответов
Это выглядит очень небезопасно (очень мало синхронизации); как насчет чего-то вроде:
class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}
(редактировать)
На самом деле вам нужен способ закрыть очередь так, чтобы читатели начинали выходить чисто - возможно, что-то вроде флага bool - если установлен, пустая очередь просто возвращается (а не блокируется):
bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}
Используйте.net 4 BlockingCollection, чтобы поставить в очередь, использовать Add(), чтобы убрать из очереди, используйте Take(). Он внутренне использует неблокирующую ConcurrentQueue. Более подробная информация здесь: Быстрая и лучшая техника очереди производителей / потребителей BlockingCollection vs одновременная очередь
Вы можете использовать BlockingCollection и ConcurrentQueue в пространстве имен System.Collections.Concurrent.
public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
public ProducerConsumerQueue()
: base(new ConcurrentQueue<T>())
{
}
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
/// <param name="maxSize"></param>
public ProducerConsumerQueue(int maxSize)
: base(new ConcurrentQueue<T>(), maxSize)
{
}
}
"Как это можно улучшить?"
Что ж, вам нужно посмотреть на каждый метод в вашем классе и подумать, что произойдет, если другой поток одновременно вызывает этот метод или любой другой метод. Например, вы устанавливаете блокировку в методе Remove, но не в методе Add. Что происходит, если один поток добавляет одновременно с удалением другого потока? Плохие вещи.
Также учтите, что метод может вернуть второй объект, который обеспечивает доступ к внутренним данным первого объекта - например, GetEnumerator. Представьте, что один поток проходит через этот перечислитель, другой поток одновременно изменяет список. Нехорошо.
Хорошее эмпирическое правило заключается в том, чтобы упростить задачу, сократив число методов в классе до абсолютного минимума.
В частности, не наследуйте другой контейнерный класс, потому что вы будете выставлять все методы этого класса, предоставляя возможность вызывающей стороне повредить внутренние данные или увидеть частично завершенные изменения данных (столь же плохие, потому что данные кажется поврежденным в этот момент). Скройте все детали и будьте полностью безжалостны о том, как вы разрешаете доступ к ним.
Я настоятельно рекомендую вам использовать готовые решения - получить книгу о многопоточности или использовать стороннюю библиотеку. В противном случае, учитывая то, что вы пытаетесь, вы будете отлаживать свой код в течение длительного времени.
Кроме того, не имеет ли смысла для Remove возвращать элемент (скажем, тот, который был добавлен первым, поскольку это очередь), а не вызывающий объект, выбирающий конкретный элемент? И когда очередь пуста, возможно, Удалить также следует заблокировать.
Обновление: ответ Марка фактически реализует все эти предложения!:) Но я оставлю это здесь, так как может быть полезно понять, почему его версия является таким улучшением.
Я просто поднял это с помощью Reactive Extensions и вспомнил этот вопрос:
public class BlockingQueue<T>
{
private readonly Subject<T> _queue;
private readonly IEnumerator<T> _enumerator;
private readonly object _sync = new object();
public BlockingQueue()
{
_queue = new Subject<T>();
_enumerator = _queue.GetEnumerator();
}
public void Enqueue(T item)
{
lock (_sync)
{
_queue.OnNext(item);
}
}
public T Dequeue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}
Не обязательно полностью безопасно, но очень просто.
Это то, что я пришел, чтобы создать потокобезопасную ограниченную очередь блокировки.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
public class BlockingBuffer<T>
{
private Object t_lock;
private Semaphore sema_NotEmpty;
private Semaphore sema_NotFull;
private T[] buf;
private int getFromIndex;
private int putToIndex;
private int size;
private int numItems;
public BlockingBuffer(int Capacity)
{
if (Capacity <= 0)
throw new ArgumentOutOfRangeException("Capacity must be larger than 0");
t_lock = new Object();
buf = new T[Capacity];
sema_NotEmpty = new Semaphore(0, Capacity);
sema_NotFull = new Semaphore(Capacity, Capacity);
getFromIndex = 0;
putToIndex = 0;
size = Capacity;
numItems = 0;
}
public void put(T item)
{
sema_NotFull.WaitOne();
lock (t_lock)
{
while (numItems == size)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}
buf[putToIndex++] = item;
if (putToIndex == size)
putToIndex = 0;
numItems++;
Monitor.Pulse(t_lock);
}
sema_NotEmpty.Release();
}
public T take()
{
T item;
sema_NotEmpty.WaitOne();
lock (t_lock)
{
while (numItems == 0)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}
item = buf[getFromIndex++];
if (getFromIndex == size)
getFromIndex = 0;
numItems--;
Monitor.Pulse(t_lock);
}
sema_NotFull.Release();
return item;
}
}
Начиная с.NET 5.0/Core 3.0, вы можете использовать System.Threading.Channels
Benchmarks из этой статьи (Шаблон асинхронного поставщика в.NET (C#)), демонстрируя значительное увеличение скорости по сравнению с BlockingCollection!
Я не полностью изучил TPL, но у них может быть что-то, что соответствует вашим потребностям, или, по крайней мере, какой-нибудь корм для рефлекторов, чтобы получить вдохновение.
Надеюсь, это поможет.
Ну, вы можете посмотреть на System.Threading.Semaphore
учебный класс. Кроме этого - нет, вы должны сделать это самостоятельно. У AFAIK нет такой встроенной коллекции.
Если вам нужна максимальная пропускная способность, позволяющая читать нескольким читателям и писать только одному писателю, BCL предлагает что-то под названием ReaderWriterLockSlim, которое должно помочь уменьшить ваш код...