Как правильно использовать BlockingCollection.GetConsumingEnumerable?

Я пытаюсь реализовать шаблон производителя / потребителя, используя BlockingCollection<T> поэтому я написал простое консольное приложение для его тестирования.

public class Program
{
    public static void Main(string[] args)
    {
        var workQueue = new WorkQueue();
        workQueue.StartProducingItems();
        workQueue.StartProcessingItems();

        while (true)
        {

        }
    }
}

public class WorkQueue
{
    private BlockingCollection<int> _queue;
    private static Random _random = new Random();

    public WorkQueue()
    {
        _queue = new BlockingCollection<int>();

        // Prefill some items.
        for (int i = 0; i < 100; i++)
        {
            //_queue.Add(_random.Next());
        }
    }

    public void StartProducingItems()
    {
        Task.Run(() =>
        {
            _queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else.
        });
    }

    public void StartProcessingItems()
    {
        Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker 1: " + item);
            }
        });

        Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker 2: " + item);
            }
        });
    }
}

Однако есть 3 проблемы с моим дизайном:

  • Я не знаю правильный способ блокировки / ожидания в моем методе Main. Выполнение простого пустого цикла while кажется ужасно неэффективным, а загрузка ЦП тратится просто ради того, чтобы приложение не заканчивалось.

  • Есть и еще одна проблема с моим дизайном: в этом простом приложении у меня есть производитель, который производит товары бесконечно, и никогда не должен останавливаться. В реальных условиях я бы хотел, чтобы это в конце концов закончилось (например, закончились файлы для обработки). В таком случае, как мне ждать его завершения в методе Main? Делать StartProducingItemsasync а потом await Это?

  • Либо GetConsumingEnumerable или же Add не работает, как я ожидал. Производитель должен постоянно добавлять элементы, но он добавляет один элемент и больше никогда не добавляет. Этот один предмет затем обрабатывается одним из потребителей. Затем оба потребителя блокируют ожидание добавления элементов, но ни один из них не блокируется. Я знаю о Take метод, но опять крутится Take в то время как цикл кажется довольно расточительным и неэффективным. E сть CompleteAdding метод, который затем не позволяет добавлять что-либо еще и выдает исключение, если вы пытаетесь, так что это не подходит.

Я точно знаю, что оба потребителя фактически блокируют и ждут новых элементов, так как я могу переключаться между потоками во время отладки:

РЕДАКТИРОВАТЬ:

Я внес изменения, предложенные в одном из комментариев, но Task.WhenAll все еще возвращается сразу.

public Task StartProcessingItems()
{
    var consumers = new List<Task>();

    for (int i = 0; i < 2; i++)
    {
        consumers.Add(Task.Run(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine($"Worker {i}: " + item);
            }
        }));
    }

    return Task.WhenAll(consumers.ToList());
}

1 ответ

GetConsumingEnumerable() блокирует Если вы хотите добавить в очередь постоянно, вы должны поставить вызов _queue.Add в цикле:

public void StartProducingItems()
{
    Task.Run(() =>
    {
        while (true)
            _queue.Add(_random.Next());
    });
}

Учитывая Main() метод, который вы могли бы назвать Console.ReadLine() Чтобы предотвратить завершение основного потока до нажатия клавиши:

public static void Main(string[] args)
{
    var workQueue = new WorkQueue();
    workQueue.StartProducingItems();
    workQueue.StartProcessingItems();

    Console.WriteLine("Press a key to terminate the application...");
    Console.ReadLine();
}
Другие вопросы по тегам