Производительность BlockingCollection(T)
Некоторое время в моей компании мы использовали доморощенный ObjectPool<T>
реализация, обеспечивающая блокировку доступа к его содержимому. Это довольно просто: Queue<T>
, object
заблокировать, и AutoResetEvent
сигнализировать потоку "заимствования" при добавлении элемента.
Мясо класса - это действительно эти два метода:
public T Borrow() {
lock (_queueLock) {
if (_queue.Count > 0)
return _queue.Dequeue();
}
_objectAvailableEvent.WaitOne();
return Borrow();
}
public void Return(T obj) {
lock (_queueLock) {
_queue.Enqueue(obj);
}
_objectAvailableEvent.Set();
}
Мы использовали этот и несколько других классов коллекций вместо тех, что были предоставлены System.Collections.Concurrent
потому что мы используем.NET 3.5, а не 4.0. Но недавно мы обнаружили, что, поскольку мы используем Reactive Extensions, у нас действительно есть Concurrent
доступное нам пространство имен (в System.Threading.dll).
Естественно, я понял, что с BlockingCollection<T>
является одним из основных классов в Concurrent
пространство имен, оно, вероятно, будет предлагать лучшую производительность, чем то, что я или мои товарищи по команде написали.
Поэтому я попытался написать новую реализацию, которая работает очень просто:
public T Borrow() {
return _blockingCollection.Take();
}
public void Return(T obj) {
_blockingCollection.Add(obj);
}
К моему удивлению, согласно некоторым простым тестам (заимствование / возврат в пул несколько тысяч раз из нескольких потоков), наша первоначальная реализация значительно превосходит BlockingCollection<T>
с точки зрения производительности. Они оба, кажется, работают правильно; просто наша первоначальная реализация выглядит намного быстрее.
Мой вопрос:
- С чего бы это? Возможно, потому что
BlockingCollection<T>
предлагает большую гибкость (я понимаю, что это работает, оборачиваяIProducerConsumerCollection<T>
), что обязательно приводит к снижению производительности? - Является ли это просто ошибочным использованием
BlockingCollection<T>
учебный класс? - Если это правильное использование
BlockingCollection<T>
я просто не правильно использую? Например, являетсяTake
/Add
подход слишком упрощен, и есть ли намного более эффективный способ получить ту же функциональность?
Если у кого-то нет понимания в ответ на этот третий вопрос, похоже, что сейчас мы будем придерживаться нашей первоначальной реализации.
3 ответа
Здесь есть несколько потенциальных возможностей.
Первый, BlockingCollection<T>
в Reactive Extensions есть бэкпорт, и он не совсем совпадает с финальной версией.NET 4. Я не удивлюсь, если производительность этого бэкпорта отличается от.NET 4 RTM (хотя я специально не представлял эту коллекцию). Большая часть TPL работает лучше в.NET 4, чем в.NET 3.5 backport.
При этом, я подозреваю, что ваша реализация превзойдет BlockingCollection<T>
если у вас есть один поток производителя и один поток потребителя. При наличии одного производителя и одного потребителя ваша блокировка будет оказывать меньшее влияние на общую производительность, а событие сброса является очень эффективным средством ожидания на стороне потребителя.
Тем не мение, BlockingCollection<T>
разработан, чтобы позволить многим потокам производителей очень хорошо "ставить" в очередь данные. Это не будет хорошо работать с вашей реализацией, так как конфликт блокировок станет довольно быстро проблематичным.
При этом я также хотел бы указать на одно заблуждение:
... это, вероятно, предложит лучшую производительность, чем то, что я или мои товарищи по команде написали.
Это часто не соответствует действительности. Классы коллекции Framework, как правило, работают очень хорошо, но часто не являются наиболее эффективным вариантом для данного сценария. При этом они, как правило, хорошо выступают, будучи очень гибкими и очень крепкими. Они часто имеют тенденцию очень хорошо масштабироваться. "Домашние" классы коллекций часто превосходят коллекции фреймворков в определенных сценариях, но, как правило, проблематичны при использовании в сценариях, отличных от тех, для которых они были специально разработаны. Я подозреваю, что это одна из тех ситуаций.
Я старался BlockingCollection
против ConurrentQueue/AutoResetEvent
комбо (аналогично решению OP, но без блокировки) в.Net 4, и последняя комбо была настолько быстрее для моего случая использования, что я отказался от BlockingCollection. К сожалению, это было почти год назад, и я не смог найти результаты теста.
Использование отдельного AutoResetEvent не слишком усложняет ситуацию. На самом деле, можно даже раз и навсегда абстрагировать его в BlockingCollectionSlim
....
Внутренне BlockingCollection также использует ConcurrentQueue, но выполняет некоторые дополнительные манипуляции с тонкими семафорами и токенами отмены, что дает дополнительные возможности, но за дополнительную плату, даже если не используется. Следует также отметить, что BlockingCollection не состоит в браке с ConcurrentQueue, но может использоваться с другими разработчиками IProducerConsumerCollection
вместо этого также.
Неограниченная, довольно голая реализация BlockingCollectionSlim:
class BlockingCollectionSlim<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
public void Add(T item)
{
_queue.Enqueue(item);
_autoResetEvent.Set();
}
public bool TryPeek(out T result)
{
return _queue.TryPeek(out result);
}
public T Take()
{
T item;
while (!_queue.TryDequeue(out item))
_autoResetEvent.WaitOne();
return item;
}
public bool TryTake(out T item, TimeSpan patience)
{
if (_queue.TryDequeue(out item))
return true;
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed < patience)
{
if (_queue.TryDequeue(out item))
return true;
var patienceLeft = (patience - stopwatch.Elapsed);
if (patienceLeft <= TimeSpan.Zero)
break;
else if (patienceLeft < MinWait)
// otherwise the while loop will degenerate into a busy loop,
// for the last millisecond before patience runs out
patienceLeft = MinWait;
_autoResetEvent.WaitOne(patienceLeft);
}
return false;
}
private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
Я столкнулся с теми же проблемами с производительностью BlockingCollection в.Net 4.7.2 и нашел этот пост. В моем случае это MultipleProducers-MultipleConsumers, в частности небольшие фрагменты данных считываются из многих источников и должны обрабатываться многими фильтрами. Было использовано несколько (Env.ProcessorCount) BlockingCollections, и я получил профилировщик производительности, который сказал мне, что BlockingCollection.GetConsumingEnumerable.MoveNext()
съедает больше процессорного времени, чем фактическая фильтрация!
Спасибо, Евгений Бересовский, за ваш код. К вашему сведению: в моей среде это было почти вдвое медленнее, чем BlockingCollection. Итак, вот моя SpinLocked BlockingCollection:
public class BlockingCollectionSpin<T>
{
private SpinLock _lock = new SpinLock(false);
private Queue<T> _queue = new Queue<T>();
public void Add(T item)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
_queue.Enqueue(item);
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public bool TryPeek(out T result)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
{
result = _queue.Peek();
return true;
}
else
{
result = default(T);
return false;
}
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public T Take()
{
var spin = new SpinWait();
do
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
return _queue.Dequeue();
}
finally
{
if (gotLock) _lock.Exit(false);
}
spin.SpinOnce();
} while (true);
}
}
А для критичного к производительности кода я бы предложил избежать readonly
модификатор поля. Он добавляет проверку каждого поля доступа в IL. Со следующим тестовым кодом
private static void TestBlockingCollections()
{
const int workAmount = 10000000;
var workerCount = Environment.ProcessorCount * 2;
var sw = new Stopwatch();
var source = new long[workAmount];
var rnd = new Random();
for (int i = 0; i < workAmount; i++)
source[i] = rnd.Next(1000000);
var swOverhead = 0.0;
for (int i = 0; i < workAmount; i++)
{
sw.Restart();
swOverhead += sw.Elapsed.TotalMilliseconds;
}
swOverhead /= workAmount;
var sum1 = new long[workerCount];
var queue1 = new BlockingCollection<long>(10000);
var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
foreach (var l in queue1.GetConsumingEnumerable())
sum1[n] += l;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue1.Add(l);
queue1.CompleteAdding();
Task.WaitAll(workers);
var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);
var sum2 = new long[workerCount];
var queue2 = new BlockingCollectionSlim<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue2.Take()).HasValue)
sum2[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue2.Add(l);
for (int i = 0; i < workerCount; i++)
queue2.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);
var sum3 = new long[workerCount];
var queue3 = new BlockingCollectionSpin<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue3.Take()).HasValue)
sum3[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue3.Add(l);
for (int i = 0; i < workerCount; i++)
queue3.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);
if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
Console.WriteLine("Wrong sum in the end!");
Console.ReadLine();
}
На Core i5-3210M с 2 ядрами и включенным HT я получил следующий вывод:
BlockingCollection 0,0006ms BlockingCollectionSlim 0.0010ms (реализация Евгения Бересовского) BlockingCollectionSpin 0,0003ms
Итак, версия SpinLocked в два раза быстрее, чем.Net BlockingCollection
, Но я бы предложил использовать только его! если вы действительно предпочитаете производительность, а не простоту кода (и удобство сопровождения).