Детерминированный мониторинг импульса / ожидания и реализация таймаута в коллекции производителя-потребителя

Я пытаюсь реализовать параллельную коллекцию производитель-потребитель (несколько производителей и потребителей), которая поддерживает тайм-ауты для потребителей.

Теперь фактическая коллекция довольно сложна (к сожалению, ничего в System.Collections.Concurrent не выполняет эту работу), но у меня есть минимальный образец, который демонстрирует мою проблему (выглядит немного как BlockingCollection<T>).

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn't thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

Идея проста: потребители, которые находят пустую очередь, ожидают сигнала, а производители Pulse (примечание: нет PulseAll, что было бы неэффективно), чтобы они уведомили их об элементе ожидания.

Моя проблема в том, что это свойство Monitor.Pulse:

Когда поток, который вызвал Pulse, снимает блокировку, следующий поток в очереди готовности (который не обязательно является потоком, который был импульсным) получает блокировку.

Это означает, что потребительский поток C1 может быть разбужен производственным потоком, чтобы потреблять элемент, но другой потребительский поток C2 может получить блокировку до того, как C1 сможет ее повторно захватить, и потребить элемент, оставив C1 с пустая очередь, когда ей передается управление.

Это означает, что я должен проверять в коде потребителя каждый импульс, если очередь действительно не пуста, и возвращаться назад и ждать с пустыми руками, если это не так.

Моя основная проблема в том, что это неэффективно - потоки могут быть разбужены для выполнения работы, а затем немедленно отправлены обратно, чтобы снова ждать. Следствием этого является то, что реализация TryDequeue с таймаутом излишне сложно и неэффективно (см. TryDequeueThatWorks) когда это должно быть элегантно (см. TryDequeueDesired).

Как я могу крутить Monitor.Pulse делать то, что я хочу? В качестве альтернативы есть еще один примитив синхронизации? Есть ли более эффективный и / или элегантный способ реализации TryDequeue тайм-аут чем то, что я сделал?

К вашему сведению, вот тест, который демонстрирует проблемы с моим желаемым решением:

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}

3 ответа

Решение

Моя основная проблема в том, что это неэффективно

Это не. Вы предполагаете, что это обычное явление, но такого рода гонки происходят очень редко. Однажды в Голубой Луне, в лучшем случае. Цикл while необходим, чтобы убедиться, что ничего не происходит, когда это происходит. И это будет. Не связывайтесь с этим.

На самом деле, наоборот, конструкция замка эффективна, потому что она позволяет совершить гонку. И занимается этим. Работать с замками очень опасно, потому что гонки происходят недостаточно часто. Они ужасно случайны, что не позволяет провести достаточное тестирование, чтобы доказать, что изменения не вызывают сбой. Добавление любого кода инструментов также не работает, оно меняет время.

Я написал статью об этом, которая может помочь:

Синхронизация потоков: ожидание и пульс демистифицированы

В частности, это объясняет, почему while петля необходима.

Вот простая основанная на ключе очередь конфликтующих производителей и потребителей:

public class ConflatingConcurrentQueue<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Entry> entries;
    private readonly BlockingCollection<Entry> queue;

    public ConflatingConcurrentQueue()
    {
        this.entries = new ConcurrentDictionary<TKey, Entry>();
        this.queue = new BlockingCollection<Entry>();
    }

    public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
    {
        // Get the entry for the key. Create a new one if necessary.
        Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Replace any old value with the new one.
            entry.Value = value;

            // Add the entry to the queue if it's not enqueued yet.
            if (!entry.Enqueued)
            {
                entry.Enqueued = true;
                queue.Add(entry);
            }
        }
    }

    public bool TryDequeue(out TValue value, TimeSpan timeout)
    {
        Entry entry;

        // Try to dequeue an entry (with timeout).
        if (!queue.TryTake(out entry, timeout))
        {
            value = default(TValue);
            return false;
        }

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Return the value.
            value = entry.Value;

            // Mark the entry as dequeued.
            entry.Enqueued = false;
            entry.Value = default(TValue);
        }

        return true;
    }

    private class Entry
    {
        public TValue Value { get; set; }
        public bool Enqueued { get; set; }
    }
}

(Это может потребовать пересмотра кода или два, но я думаю, что в целом это нормально.)

Другие вопросы по тегам