Шаблон производитель / потребитель с очередью FIFO фиксированного размера

Мне нужно реализовать шаблон производителя / потребителя вокруг очереди FIFO фиксированного размера. Я думаю, что класс-оболочка вокруг ConcurrentQueue может работать для этого, но я не совсем уверен (и я никогда раньше не работал с ConcurrentQueue). Суть в том, что очередь должна содержать только фиксированное количество элементов (строки в моем случае). Мое приложение будет иметь одну задачу / поток производителя и одну задачу / поток пользователя. Когда запускается моя потребительская задача, она должна удалить все элементы, которые находятся в очереди в данный момент времени, и обработать их.

Что бы это ни стоило, обработка поставленных в очередь элементов моим потребителем - это не что иное, как загрузка их через SOAP в веб-приложение, которое не на 100% надежно. Если не удается установить соединение или не удается выполнить вызов SOAP, я должен отбросить эти элементы и вернуться в очередь для получения дополнительной информации. Из-за издержек SOAP я пытался максимизировать количество элементов из очереди, которые я мог бы отправить за один вызов SOAP.

Иногда мой производитель может добавлять элементы быстрее, чем мой потребитель может их удалить и обработать. Если очередь уже заполнена, и мой производитель должен добавить еще один элемент, мне нужно поставить в очередь новый элемент, но затем удалить самый старый элемент из очереди, чтобы размер очереди оставался фиксированным. По сути, мне нужно хранить самые последние элементы, которые производятся в очереди, постоянно (даже если это означает, что некоторые элементы не потребляются, потому что мой потребитель в настоящее время обрабатывает предыдущие элементы).

Что касается производителя, сохраняющего число, если элементы в очереди фиксированы, я нашел одну потенциальную идею из этого вопроса:

Очередь фиксированного размера, которая автоматически удаляет старые значения при новых запросах

В настоящее время я использую класс-оболочку (на основе этого ответа) вокруг ConcurrentQueue с методом Enqueue(), например так:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

Я создаю экземпляр этого класса с ограничением размера в очереди следующим образом:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

Я запускаю задачу продюсера, и она начинает заполнять очередь. Код в моем методе Enqueue(), кажется, работает правильно в отношении удаления самого старого элемента из очереди, когда добавление элемента приводит к тому, что число очередей превышает максимальный размер. Теперь мне нужно выполнить задачу потребителя, чтобы снять с очереди предметы и обработать их, но здесь мой мозг запутался. Каков наилучший способ реализовать метод Dequeue для моего потребителя, который будет делать моментальный снимок очереди и удалять все элементы из очереди для обработки (производитель может все еще добавлять элементы в очередь во время этого процесса)?

2 ответа

Решение

Проще говоря, в ConcurrentQueue есть метод "ToArray", который при вводе блокирует коллекцию и создает "снимок" всех текущих элементов в очереди. Если вы хотите, чтобы ваш потребитель получил блок вещей для работы, вы можете заблокировать тот же объект, который есть у метода постановки в очередь, вызвать ToArray(), а затем прокрутить while(!queue.IsEmpty) queue.TryDequeue(out trash) цикл, чтобы очистить очередь, прежде чем возвращать массив, который вы извлекли.

Это будет вашим GetAll() метод:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

Поскольку вы должны очистить очередь, вы можете просто объединить две операции; создайте массив правильного размера (используя queue.Count), затем, пока очередь не пуста, удалите элемент из очереди и поместите его в массив перед возвратом.

Теперь это ответ на конкретный вопрос. Теперь я должен с чистой совестью надеть шляпу CodeReview.SE и указать на несколько вещей:

  • НИКОГДА не используйте lock(this), Вы никогда не знаете, какие другие объекты могут использовать ваш объект в качестве блокирующего фокуса, и, таким образом, будут заблокированы, когда объект блокируется изнутри. Рекомендуется блокировать экземпляр объекта с частной областью действия, обычно созданный только для блокировки: private readonly object syncObj = new object();

  • Так как в любом случае вы блокируете критические разделы вашей оболочки, я бы использовал обычный List<T> вместо одновременной коллекции. Доступ быстрее, его легче очистить, поэтому вы сможете делать то, что делаете, гораздо проще, чем позволяет ConcurrentQueue. Чтобы поставить в очередь, заблокируйте синхронизирующий объект, вставьте () до нуля индекса, затем удалите все элементы из индекса размера в текущий счетчик списка, используя RemoveRange(). Для снятия блокировки заблокируйте тот же объект синхронизации, вызовите myList.ToArray() (из пространства имен Linq; выполняет почти то же самое, что и ConcurrentQueue), а затем вызовите myList.Clear() перед возвратом массива. Не может быть проще:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • Похоже, вы понимаете, что выбрасываете в очередь предметы с использованием этой модели. Обычно это не очень хорошая вещь, но я готов дать вам преимущество сомнения. Тем не менее, я скажу, что есть способ без потерь, используя BlockingCollection. BlockingCollection обертывает любые IProducerConsumerCollection, включая большинство классов System.Collections.Concurrent, и позволяет указать максимальную емкость для очереди. Затем коллекция будет блокировать любой поток, пытающийся выйти из пустой очереди, или любой поток, пытающийся добавить его в полную очередь, до тех пор, пока элементы не будут добавлены или удалены, так что есть что-то, что можно получить, или место для вставки. Это лучший способ реализовать очередь производителя-потребителя с максимальным размером, или такой, который в противном случае потребовал бы "опроса", чтобы увидеть, есть ли у потребителя что-то для работы. Если вы идете по этому пути, выбрасываются только те, которые потребитель должен выбросить; потребитель увидит все строки, которые вставляет производитель, и сам примет решение по каждой из них.

Вы не хотите использовать lock с this, Посмотрите, почему блокировка (это) {…} плоха? Больше подробностей.

Этот код

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

предполагает, что вам нужно каким-то образом подождать или уведомить потребителя о наличии товара. В этом случае рассмотрите возможность использования BlockingCollection<T> вместо.

Другие вопросы по тегам