Как правильно использовать BlockingCollection.GetConsumingEnumerable?
Я пытаюсь реализовать шаблон производителя / потребителя, используя BlockingCollection<T>
поэтому я написал простое консольное приложение для его тестирования.
public class Program
{
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
while (true)
{
}
}
}
public class WorkQueue
{
private BlockingCollection<int> _queue;
private static Random _random = new Random();
public WorkQueue()
{
_queue = new BlockingCollection<int>();
// Prefill some items.
for (int i = 0; i < 100; i++)
{
//_queue.Add(_random.Next());
}
}
public void StartProducingItems()
{
Task.Run(() =>
{
_queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else.
});
}
public void StartProcessingItems()
{
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 1: " + item);
}
});
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 2: " + item);
}
});
}
}
Однако есть 3 проблемы с моим дизайном:
Я не знаю правильный способ блокировки / ожидания в моем методе Main. Выполнение простого пустого цикла while кажется ужасно неэффективным, а загрузка ЦП тратится просто ради того, чтобы приложение не заканчивалось.
Есть и еще одна проблема с моим дизайном: в этом простом приложении у меня есть производитель, который производит товары бесконечно, и никогда не должен останавливаться. В реальных условиях я бы хотел, чтобы это в конце концов закончилось (например, закончились файлы для обработки). В таком случае, как мне ждать его завершения в методе Main? Делать
StartProducingItems
async
а потомawait
Это?Либо
GetConsumingEnumerable
или жеAdd
не работает, как я ожидал. Производитель должен постоянно добавлять элементы, но он добавляет один элемент и больше никогда не добавляет. Этот один предмет затем обрабатывается одним из потребителей. Затем оба потребителя блокируют ожидание добавления элементов, но ни один из них не блокируется. Я знаю оTake
метод, но опять крутитсяTake
в то время как цикл кажется довольно расточительным и неэффективным. E стьCompleteAdding
метод, который затем не позволяет добавлять что-либо еще и выдает исключение, если вы пытаетесь, так что это не подходит.
Я точно знаю, что оба потребителя фактически блокируют и ждут новых элементов, так как я могу переключаться между потоками во время отладки:
РЕДАКТИРОВАТЬ:
Я внес изменения, предложенные в одном из комментариев, но Task.WhenAll
все еще возвращается сразу.
public Task StartProcessingItems()
{
var consumers = new List<Task>();
for (int i = 0; i < 2; i++)
{
consumers.Add(Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine($"Worker {i}: " + item);
}
}));
}
return Task.WhenAll(consumers.ToList());
}
1 ответ
GetConsumingEnumerable()
блокирует Если вы хотите добавить в очередь постоянно, вы должны поставить вызов _queue.Add
в цикле:
public void StartProducingItems()
{
Task.Run(() =>
{
while (true)
_queue.Add(_random.Next());
});
}
Учитывая Main()
метод, который вы могли бы назвать Console.ReadLine()
Чтобы предотвратить завершение основного потока до нажатия клавиши:
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
Console.WriteLine("Press a key to terminate the application...");
Console.ReadLine();
}