Производитель / Потребитель, BlockingCollection и ожидание изменений

Я пытаюсь обернуть голову вокруг BlockingCollection и моей проблемы производителя / потребителя.

Чего я хочу добиться, так это следующего:

  • Поточно-ориентированная очередь для хранения списка объектов ("заданий") в формате FIFO.
  • Вторая потокобезопасная очередь, которая содержит список результатов этих заданий, также в формате FIFO.

Другими словами:

Inbound "Job" Data, can come at any time from multiple threads 
   ==> Thread-Safe FIFO Queue 1 "FQ1"
      ==> Async Processing of data in FQ1 (and remove item from FQ1)
         ==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
            ==> Async Processing of data in FQ2 (and remove item from FQ2)
               ==> Done

Мои скромные попытки до сих пор:

private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;

(...)

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}

Одна из причин, по которой я выбрал BlockingCollection, заключается в том, что я хочу снизить нагрузку до минимума, то есть выполнять работу (а не заниматься ожиданием / сном), когда элементы фактически находятся внутри коллекции. Я не уверен, является ли foreach правильным подходом для этого.

Пожалуйста, дайте мне знать, если это правильно или есть лучший подход. Спасибо!

Edit Я мог сказать из модульного тестирования, что работа внутри задачи на самом деле была синхронной. Новая версия выглядит следующим образом:

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}

Вклад очень ценится!

1 ответ

Одна из причин, по которой я выбрал BlockingCollection, заключается в том, что я хочу снизить нагрузку до минимума, то есть выполнять работу (а не заниматься ожиданием / сном), когда элементы фактически находятся внутри коллекции. Я не уверен, является ли foreach правильным подходом для этого.

Это правильный подход, foreach будет заблокирован, пока новый элемент не будет добавлен в очередь или CompleteAdding метод будет вызван. Не правильно, что вы хотите добиться асинхронной обработки с BlockingCollection. BlockingCollection представляет собой простую очередь производителя / потребителя и должна использоваться, когда необходимо поддерживать порядок, в котором обрабатываются задания и результаты заданий. Из-за этого это синхронно. Задания будут обрабатываться в том же порядке, в котором они были добавлены.

Если все, что вам нужно, это асинхронное выполнение, вам не нужна очередь. В этом случае вы можете использовать TPL, просто вызывать новую задачу для каждой работы, они будут помещаться в очередь внутри TPL и использовать столько потоков ОС, сколько ваша система сможет эффективно обработать. Например, ваши задания могут порождать их собственные задачи. Это гораздо более гибкий подход.

Кроме того, очередь производителя / потребителя может использоваться для организации конвейерного выполнения заданий. В этом случае работа должна быть разбита на несколько этапов. Каждый шаг должен выполняться отдельным потоком. В каждом потоке шага задания мы должны прочитать задания из одной очереди, выполнить это задание и затем поместить его в следующую очередь.

interface IJob
{
    void Step1();
    void Step2();
    ...
}

var step1 = new BlockingCollection<IJob>();
var step2 = new BlockingCollection<IJob>();
...

Task.Factory.StartNew(() =>
    {
        foreach(var step in step1.GetConsumingEnumerable()) {
            step.Step1();
            step2.Add(step);
        }
    });

Task.Factory.StartNew(() =>
    {
        foreach(var step in step2.GetConsumingEnumerable()) {
            // while performing Step2, another thread can execute Step1
            // of the next job
            step.Step2();
            step3.Add(step);
        }
    });

В этом случае задания будут выполняться в порядке FIFO, но параллельно. Но если вы хотите выполнить конвейерную обработку, вы должны сначала подумать о балансировке нагрузки. Если один из шагов занимает слишком много времени, его очередь увеличится, а другие потоки будут простаивать большую часть времени.

Другие вопросы по тегам