Альтернатива потоку данных BroadcastBlock с гарантированной доставкой

Мне нужно иметь какой-то объект, который действует как BroadcastBlock, но с гарантированной доставкой. Поэтому я использовал ответ на этот вопрос. Но я не очень четко понимаю поток выполнения здесь. У меня есть консольное приложение. Вот мой код:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

Каждое число должно обрабатываться последовательно, поэтому я не могу использовать MaxDegreeOfParallelism в блоке вещателя. Но все блоки действий, которые получают номер, могут работать параллельно.

Итак, вот вопрос:

В выводе я вижу различные идентификаторы потоков. Я правильно понимаю, что работает следующим образом:

Хиты исполнения await block.SendAsync(num); в телекомпании. Если текущий блок не готов принять число, выполнение выходит из вещателя и зависает на Task.WaitAll. Когда блок принимает число, остальная часть оператора foreach в вещателе выполняется в пуле потоков. И то же самое до конца. Каждая итерация foreach выполняется в пуле потоков. Но на самом деле это происходит последовательно.

Я прав или нет в моем понимании? Как я могу изменить этот код для асинхронной отправки номера всем блокам?

Чтобы убедиться, что если один из блоков не готов получить номер в данный момент, я не буду ждать его, а все остальные, которые готовы, получат номер. И что все блоки могут работать параллельно. И гарантия доставки.

1 ответ

Решение

Предполагая, что вы хотите обрабатывать один элемент за раз broadcaster позволяя целевым блокам получать этот предмет одновременно, вам нужно изменить broadcaster чтобы предложить номер всем блокам одновременно, а затем асинхронно подождать, пока все они вместе примут его, прежде чем перейти к следующему номеру:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

Теперь, в этом случае, когда у вас нет работы после ожидания, вы можете немного оптимизировать, по-прежнему возвращая ожидаемое задание:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
Другие вопросы по тегам