Может ли C# блокировать сообщения FIFO в очереди утечки?
Я работаю над академическим проектом с открытым исходным кодом, и теперь мне нужно создать быструю блокирующую очередь FIFO в C#. Моя первая реализация просто обернула синхронизированную очередь (с динамическим расширением) в семафор читателя, затем я решила реализовать ее следующим (теоретически более быстрым) способом
public class FastFifoQueue<T>
{
private T[] _array;
private int _head, _tail, _count;
private readonly int _capacity;
private readonly Semaphore _readSema, _writeSema;
/// <summary>
/// Initializes FastFifoQueue with the specified capacity
/// </summary>
/// <param name="size">Maximum number of elements to store</param>
public FastFifoQueue(int size)
{
//Check if size is power of 2
//Credit: http://stackru.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
if ((size & (size - 1)) != 0)
throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");
_capacity = size;
_array = new T[size];
_count = 0;
_head = int.MinValue; //0 is the same!
_tail = int.MinValue;
_readSema = new Semaphore(0, _capacity);
_writeSema = new Semaphore(_capacity, _capacity);
}
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
Interlocked.Exchange(ref _array[index], item);
Interlocked.Increment(ref _count);
_readSema.Release();
}
public T Dequeue()
{
_readSema.WaitOne();
int index = Interlocked.Increment(ref _tail);
index %= _capacity;
if (index < 0) index += _capacity;
T ret = Interlocked.Exchange(ref _array[index], null);
Interlocked.Decrement(ref _count);
_writeSema.Release();
return ret;
}
public int Count
{
get
{
return _count;
}
}
}
Это классическая реализация очереди FIFO со статическим массивом, который мы находим в учебниках. Он предназначен для атомарного увеличения указателей, и, поскольку я не могу заставить указатель вернуться к нулю при достижении (емкость-1), я вычисляю по модулю отдельно. Теоретически, использование Interlocked - это то же самое, что и блокировка перед выполнением инкремента, и, поскольку существуют семафоры, несколько производителей / потребителей могут войти в очередь, но только один за раз может изменять указатели очереди. Во-первых, поскольку Interlocked.Increment сначала увеличивает, а затем возвращает, я уже понимаю, что я ограничен в использовании значения после увеличения и начинаю хранить элементы с позиции 1 в массиве. Это не проблема, я вернусь к 0, когда я достигну определенного значения
В чем проблема с этим? Вы не поверите, что при большой нагрузке очередь иногда возвращает значение NULL. Я уверен, повторяю, я уверен, что ни один метод не помещает ноль в очередь. Это определенно верно, потому что я попытался поставить нулевую проверку в Enqueue, чтобы убедиться, и не было выдано никакой ошибки. Я создал тестовый пример для этого с помощью Visual Studio (кстати, я использую двухъядерный процессор, как люди из maaaaaaaany)
private int _errors;
[TestMethod()]
public void ConcurrencyTest()
{
const int size = 3; //Perform more tests changing it
_errors = 0;
IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
Thread[] producers = new Thread[size], consumers = new Thread[size];
for (int i = 0; i < size; i++)
{
producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
producers[i].Start(queue);
consumers[i].Start(queue);
}
Thread.Sleep(new TimeSpan(0, 0, 1, 0));
for (int i = 0; i < size; i++)
{
producers[i].Abort();
consumers[i].Abort();
}
Assert.AreEqual(0, _errors);
}
private void LoopProducer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
try
{
q.Enqueue(new object());
}
catch
{ }
}
}
catch (ThreadAbortException)
{ }
}
private void LoopConsumer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
object item = q.Dequeue();
if (item == null) Interlocked.Increment(ref _errors);
}
}
catch (ThreadAbortException)
{ }
}
Как только поток потребителя получает ноль, подсчитывается ошибка. При выполнении теста с 1 производителем и 1 потребителем он проходит успешно. При выполнении теста с двумя производителями и двумя или более потребителями происходит авария: обнаруживается даже 2000 утечек. Я обнаружил, что проблема может быть в методе Enqueue. По контракту на разработку, производитель может писать только в пустую ячейку (null), но, модифицируя мой код с помощью некоторой диагностики, я обнаружил, что иногда производитель пытается писать в непустую ячейку, которая затем занята "хорошим" " данные.
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
T leak = Interlocked.Exchange(ref _array[index], item);
//Diagnostic code
if (leak != null)
{
throw new InvalidOperationException("Too bad...");
}
Interlocked.Increment(ref _count);
_readSema.Release();
}
"Слишком плохое" исключение случается тогда часто. Но слишком странно, что конфликт возникает из-за одновременной записи, потому что приращения являются атомарными, а семафор писателя допускает только столько писателей, сколько свободных ячеек массива.
Может ли кто-нибудь помочь мне с этим? Буду очень признателен, если вы поделитесь со мной своими навыками и опытом.
Спасибо.
3 ответа
Я должен сказать, что это показалось мне очень умной идеей, и я немного подумал об этом, прежде чем начал понимать, где (я думаю) ошибка здесь. Так что, с одной стороны, спасибо за такой умный дизайн! Но в то же время вам стыдно за демонстрацию "Закона Кернигана ":
Отладка в два раза сложнее, чем написание кода. Поэтому, если вы пишете код настолько умно, насколько это возможно, вы, по определению, недостаточно умны для его отладки.
Проблема в основном такова: вы предполагаете, что WaitOne
а также Release
звонки эффективно сериализовать все ваши Enqueue
а также Dequeue
операции; но это не совсем то, что здесь происходит. Помните, что Semaphore
Класс используется для ограничения количества потоков, обращающихся к ресурсу, а не для обеспечения определенного порядка событий. Что происходит между каждым WaitOne
а также Release
не гарантированно происходит в том же "порядке потока", что и WaitOne
а также Release
называет себя
Это сложно объяснить словами, поэтому позвольте мне привести наглядную иллюстрацию.
Допустим, ваша очередь имеет емкость 8 и выглядит так (пусть 0
представлять null
а также x
представлять объект):
[xxxxxxxx]
Так Enqueue
был вызван 8 раз, и очередь заполнена. Поэтому ваш _writeSema
семафор будет блокироваться на WaitOne
, и ваш _readSema
Семафор вернется сразу на WaitOne
,
Теперь давайте предположим, Dequeue
вызывается более или менее одновременно на 3 разных потоках. Давайте назовем их T1, T2 и T3.
Прежде чем продолжить, позвольте мне нанести несколько ярлыков на ваш Dequeue
реализация, для справки:
public T Dequeue()
{
_readSema.WaitOne(); // A
int index = Interlocked.Increment(ref _tail); // B
index %= _capacity;
if (index < 0) index += _capacity;
T ret = Interlocked.Exchange(ref _array[index], null); // C
Interlocked.Decrement(ref _count);
_writeSema.Release(); // D
return ret;
}
Итак, T1, T2 и T3 все прошли точку A. Тогда для простоты предположим, что все они достигают линии B "по порядку", так что T1 имеет index
0, Т2 имеет index
1, а Т3 имеет index
из 2.
Все идет нормально. Но вот что надо: нет гарантии, что отсюда T1, T2 и T3 попадут в линию D в любом указанном порядке. Предположим, что T3 фактически опережает T1 и T2, проходя мимо линии C (и таким образом устанавливая _array[2]
в null
) и весь путь до линии D.
После этого _writeSema
будет сигнализироваться, что означает, что в вашей очереди есть один слот для записи, верно? Но ваша очередь теперь выглядит так!
[хх 0 ххххх]
Так что, если другой поток пришел вместе с вызовом Enqueue
, это на самом деле пройдет _writeSema.WaitOne
, приращение _head
и получить index
0, хотя слот 0 не пуст. Результатом этого будет то, что элемент в слоте 0 может быть перезаписан до того, как T1 (помните его?) Прочитает его.
Чтобы понять, где ваш null
значения берутся, вам нужно только визуализировать обратный процесс, который я только что описал. То есть предположим, что ваша очередь выглядит так:
[0 0 0 0 0 0 0 0]
Три темы, T1, T2 и T3, все вызовы Enqueue
почти одновременно. Приращение T3 _head
последний, но вставляет свой элемент (в _array[2]
) и звонки _readSema.Release
во-первых, что приводит к сигналу _readSema
но очередь выглядит так:
[0 0 x 0 0 0 0 0]
Так что, если другой поток пришел вместе с вызовом Dequeue
(прежде чем T1 и T2 закончат делать свое дело), он пройдет _readSema.WaitOne
, приращение _tail
и получить index
0, хотя слот 0 пуст.
Так вот в чем твоя проблема. Что касается решения, у меня нет никаких предложений на данный момент. Дайте мне немного времени подумать... (Я публикую этот ответ сейчас, потому что он свеж в моей памяти, и я чувствую, что он может вам помочь.)
(+1 к Дэну Тао, за которого я голосую, есть ответ). Очередь будет изменена на что-то вроде этого...
while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
;
Dequeue будет изменен на что-то вроде этого...
while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
;
Это основано на превосходном анализе Дана Тао. Поскольку индексы получаются атомарно, то (при условии, что в методах enqueue или dequeue нет потоков, которые умирают или заканчиваются), читателю гарантированно в конечном итоге будет заполнена его ячейка, или писателю гарантированно освободится его ячейка (ноль).
Спасибо, Дэн Тао и Лес,
Я очень ценю вашу помощь. Дэн, ты открыл мой разум: не важно, сколько производителей / потребителей находится внутри критической секции, важно, чтобы блокировки были открыты по порядку. Лес, ты нашел решение проблемы.
Теперь пришло время, наконец, ответить на мой собственный вопрос с помощью окончательного кода, который я сделал благодаря помощи вас обоих. Ну, это не так много, но это небольшое улучшение из кода Лес
Ставить:
while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
Thread.Sleep(0);
Dequeue:
while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
Thread.Sleep(0);
Почему Thread.Sleep(0)? Когда мы обнаруживаем, что элемент не может быть извлечен / сохранен, зачем сразу проверять снова? Мне нужно принудительно переключать контекст, чтобы другие потоки могли читать / писать. Очевидно, что следующий поток, который будет запланирован, может быть другим потоком, не способным работать, но, по крайней мере, мы его форсируем. Источник: http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html
Я также проверил код предыдущего контрольного примера, чтобы получить подтверждение моих утверждений:
без сна (0)
Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements
со сном (0)
Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements
Я знаю, что это не "великое" открытие, и я не получу приз Тьюринга за эти числа. Прирост производительности не драматичен, но больше нуля. Принудительное переключение контекста позволяет выполнять больше операций RW (= более высокая пропускная способность).
Для ясности: в моем тесте я просто оцениваю производительность очереди, а не моделирую проблему производителя / потребителя, поэтому не волнует, если в конце теста через минуту все еще остаются элементы в очереди. Но я только что продемонстрировал, что мой подход работает, спасибо вам всем.
Код доступен с открытым исходным кодом в формате MS-RL: http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs?revision=461&view=markup