Шаблон производитель / потребитель с очередью FIFO фиксированного размера
Мне нужно реализовать шаблон производителя / потребителя вокруг очереди FIFO фиксированного размера. Я думаю, что класс-оболочка вокруг ConcurrentQueue может работать для этого, но я не совсем уверен (и я никогда раньше не работал с ConcurrentQueue). Суть в том, что очередь должна содержать только фиксированное количество элементов (строки в моем случае). Мое приложение будет иметь одну задачу / поток производителя и одну задачу / поток пользователя. Когда запускается моя потребительская задача, она должна удалить все элементы, которые находятся в очереди в данный момент времени, и обработать их.
Что бы это ни стоило, обработка поставленных в очередь элементов моим потребителем - это не что иное, как загрузка их через SOAP в веб-приложение, которое не на 100% надежно. Если не удается установить соединение или не удается выполнить вызов SOAP, я должен отбросить эти элементы и вернуться в очередь для получения дополнительной информации. Из-за издержек SOAP я пытался максимизировать количество элементов из очереди, которые я мог бы отправить за один вызов SOAP.
Иногда мой производитель может добавлять элементы быстрее, чем мой потребитель может их удалить и обработать. Если очередь уже заполнена, и мой производитель должен добавить еще один элемент, мне нужно поставить в очередь новый элемент, но затем удалить самый старый элемент из очереди, чтобы размер очереди оставался фиксированным. По сути, мне нужно хранить самые последние элементы, которые производятся в очереди, постоянно (даже если это означает, что некоторые элементы не потребляются, потому что мой потребитель в настоящее время обрабатывает предыдущие элементы).
Что касается производителя, сохраняющего число, если элементы в очереди фиксированы, я нашел одну потенциальную идею из этого вопроса:
Очередь фиксированного размера, которая автоматически удаляет старые значения при новых запросах
В настоящее время я использую класс-оболочку (на основе этого ответа) вокруг ConcurrentQueue с методом Enqueue(), например так:
public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizeQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);
lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}
Я создаю экземпляр этого класса с ограничением размера в очереди следующим образом:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
Я запускаю задачу продюсера, и она начинает заполнять очередь. Код в моем методе Enqueue(), кажется, работает правильно в отношении удаления самого старого элемента из очереди, когда добавление элемента приводит к тому, что число очередей превышает максимальный размер. Теперь мне нужно выполнить задачу потребителя, чтобы снять с очереди предметы и обработать их, но здесь мой мозг запутался. Каков наилучший способ реализовать метод Dequeue для моего потребителя, который будет делать моментальный снимок очереди и удалять все элементы из очереди для обработки (производитель может все еще добавлять элементы в очередь во время этого процесса)?
2 ответа
Проще говоря, в ConcurrentQueue есть метод "ToArray", который при вводе блокирует коллекцию и создает "снимок" всех текущих элементов в очереди. Если вы хотите, чтобы ваш потребитель получил блок вещей для работы, вы можете заблокировать тот же объект, который есть у метода постановки в очередь, вызвать ToArray(), а затем прокрутить while(!queue.IsEmpty) queue.TryDequeue(out trash)
цикл, чтобы очистить очередь, прежде чем возвращать массив, который вы извлекли.
Это будет вашим GetAll()
метод:
public T[] GetAll()
{
lock (syncObj) // so that we don't clear items we didn't get with ToArray()
{
var result = queue.ToArray();
T trash;
while(!queue.IsEmpty) queue.TryDequeue(out trash);
}
}
Поскольку вы должны очистить очередь, вы можете просто объединить две операции; создайте массив правильного размера (используя queue.Count), затем, пока очередь не пуста, удалите элемент из очереди и поместите его в массив перед возвратом.
Теперь это ответ на конкретный вопрос. Теперь я должен с чистой совестью надеть шляпу CodeReview.SE и указать на несколько вещей:
НИКОГДА не используйте
lock(this)
, Вы никогда не знаете, какие другие объекты могут использовать ваш объект в качестве блокирующего фокуса, и, таким образом, будут заблокированы, когда объект блокируется изнутри. Рекомендуется блокировать экземпляр объекта с частной областью действия, обычно созданный только для блокировки:private readonly object syncObj = new object();
Так как в любом случае вы блокируете критические разделы вашей оболочки, я бы использовал обычный
List<T>
вместо одновременной коллекции. Доступ быстрее, его легче очистить, поэтому вы сможете делать то, что делаете, гораздо проще, чем позволяет ConcurrentQueue. Чтобы поставить в очередь, заблокируйте синхронизирующий объект, вставьте () до нуля индекса, затем удалите все элементы из индекса размера в текущий счетчик списка, используя RemoveRange(). Для снятия блокировки заблокируйте тот же объект синхронизации, вызовите myList.ToArray() (из пространства имен Linq; выполняет почти то же самое, что и ConcurrentQueue), а затем вызовите myList.Clear() перед возвратом массива. Не может быть проще:public class FixedSizeQueue<T> { private readonly List<T> queue = new List<T>(); private readonly object syncObj = new object(); public int Size { get; private set; } public FixedSizeQueue(int size) { Size = size; } public void Enqueue(T obj) { lock (syncObj) { queue.Insert(0,obj) if(queue.Count > Size) queue.RemoveRange(Size, Count-Size); } } public T[] Dequeue() { lock (syncObj) { var result = queue.ToArray(); queue.Clear(); return result; } } }
Похоже, вы понимаете, что выбрасываете в очередь предметы с использованием этой модели. Обычно это не очень хорошая вещь, но я готов дать вам преимущество сомнения. Тем не менее, я скажу, что есть способ без потерь, используя BlockingCollection. BlockingCollection обертывает любые IProducerConsumerCollection, включая большинство классов System.Collections.Concurrent, и позволяет указать максимальную емкость для очереди. Затем коллекция будет блокировать любой поток, пытающийся выйти из пустой очереди, или любой поток, пытающийся добавить его в полную очередь, до тех пор, пока элементы не будут добавлены или удалены, так что есть что-то, что можно получить, или место для вставки. Это лучший способ реализовать очередь производителя-потребителя с максимальным размером, или такой, который в противном случае потребовал бы "опроса", чтобы увидеть, есть ли у потребителя что-то для работы. Если вы идете по этому пути, выбрасываются только те, которые потребитель должен выбросить; потребитель увидит все строки, которые вставляет производитель, и сам примет решение по каждой из них.
Вы не хотите использовать lock
с this
, Посмотрите, почему блокировка (это) {…} плоха? Больше подробностей.
Этот код
// if queue count > max queue size, then dequeue an item
while (queue.Count > Size)
{
T objOut;
queue.TryDequeue(out objOut);
}
предполагает, что вам нужно каким-то образом подождать или уведомить потребителя о наличии товара. В этом случае рассмотрите возможность использования BlockingCollection<T>
вместо.