Поток данных TPL: ограниченная емкость и ожидание завершения

Ниже для простоты скопирован реальный сценарий в виде сценария LINQPad:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = 25;

var action1 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);});
var action2 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);});

var actionBlock1 = new ActionBlock<IEnumerable<string>>(action1, edbOptions);
var actionBlock2 = new ActionBlock<IEnumerable<string>>(action2, edbOptions);

var batchBlock1 = new BatchBlock<string>(5, gdbOptions);
var batchBlock2 = new BatchBlock<string>(5, gdbOptions);

batchBlock1.LinkTo(actionBlock1, dlOptions);
batchBlock2.LinkTo(actionBlock2, dlOptions);

var bufferBlock1 = new BufferBlock<string>(dbOptions); 
var bufferBlock2 = new BufferBlock<string>(dbOptions); 

bufferBlock1.LinkTo(batchBlock1, dlOptions);
bufferBlock2.LinkTo(batchBlock2, dlOptions);

var bcBlock = new BroadcastBlock<string>(x => x, dbOptions);

bcBlock.LinkTo(bufferBlock1, dlOptions);
bcBlock.LinkTo(bufferBlock2, dlOptions);

var mainBlock = new TransformBlock<int, string>(x => x.ToString(), edbOptions);
mainBlock.LinkTo(bcBlock, dlOptions);

mainBlock.Dump("Main Block");
bcBlock.Dump("Broadcast Block");
bufferBlock1.Dump("Buffer Block 1");
bufferBlock2.Dump("Buffer Block 2");
actionBlock1.Dump("Action Block 1");
actionBlock2.Dump("Action Block 2");

foreach(var i in Enumerable.Range(1, total))
  await mainBlock.SendAsync(i, cts.Token);

mainBlock.Complete();

await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion);

counter1.Dump("Counter 1");
counter2.Dump("Counter 2");

У меня есть две проблемы с этим кодом:

  1. Хотя я ограничен BoundedCapacity из всех соответствующих блоков по 10 элементов кажется, что я могу выдвинуть все 1 000 000 сообщений почти одновременно. Это ожидаемое поведение?
  2. Хотя вся сеть настроена на распространение завершения, похоже, что все блоки завершаются почти сразу после вызова mainBlock.Complete(), Я ожидаю, что оба counter1 а также counter2 переменные должны быть равны total, Есть ли способ добиться такого поведения?

1 ответ

Решение

Да, это ожидаемое поведение из-заBroadcastBlock:

Предоставляет буфер для хранения не более одного элемента за раз, перезаписывая каждое сообщение следующим по мере его поступления.

Это означает, что если вы ссылаетесь BroadcastBlock блокировать с BoundedCapacity, вы потеряете сообщения.

Чтобы это исправить, вы можете создать пользовательский блок, который ведет себя как BroadcastBlock, но гарантирует доставку ко всем целям. Но делать это не тривиально, поэтому вы можете быть удовлетворены более простым вариантом (первоначально из моего старого ответа):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targetsList)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

Использование в вашем случае будет:

var bcBlock = CreateGuaranteedBroadcastBlock(
    new[] { bufferBlock1, bufferBlock2 }, dbOptions);
Другие вопросы по тегам