Альтернатива потоку данных BroadcastBlock с гарантированной доставкой
Мне нужно иметь какой-то объект, который действует как BroadcastBlock, но с гарантированной доставкой. Поэтому я использовал ответ на этот вопрос. Но я не очень четко понимаю поток выполнения здесь. У меня есть консольное приложение. Вот мой код:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);
Task.WaitAll(ToWait.ToArray());
Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
broadcaster.Complete();
}
Каждое число должно обрабатываться последовательно, поэтому я не могу использовать MaxDegreeOfParallelism в блоке вещателя. Но все блоки действий, которые получают номер, могут работать параллельно.
Итак, вот вопрос:
В выводе я вижу различные идентификаторы потоков. Я правильно понимаю, что работает следующим образом:
Хиты исполнения await block.SendAsync(num);
в телекомпании. Если текущий блок не готов принять число, выполнение выходит из вещателя и зависает на Task.WaitAll. Когда блок принимает число, остальная часть оператора foreach в вещателе выполняется в пуле потоков. И то же самое до конца. Каждая итерация foreach выполняется в пуле потоков. Но на самом деле это происходит последовательно.
Я прав или нет в моем понимании? Как я могу изменить этот код для асинхронной отправки номера всем блокам?
Чтобы убедиться, что если один из блоков не готов получить номер в данный момент, я не буду ждать его, а все остальные, которые готовы, получат номер. И что все блоки могут работать параллельно. И гарантия доставки.
1 ответ
Предполагая, что вы хотите обрабатывать один элемент за раз broadcaster
позволяя целевым блокам получать этот предмет одновременно, вам нужно изменить broadcaster
чтобы предложить номер всем блокам одновременно, а затем асинхронно подождать, пока все они вместе примут его, прежде чем перейти к следующему номеру:
var broadcaster = new ActionBlock<int>(async num =>
{
var tasks = new List<Task>();
foreach (var block in blocks)
{
tasks.Add(block.SendAsync(num));
}
await Task.WhenAll(tasks);
}, execopt);
Теперь, в этом случае, когда у вас нет работы после ожидания, вы можете немного оптимизировать, по-прежнему возвращая ожидаемое задание:
ActionBlock<int> broadcaster = new ActionBlock<int>(
num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);