Как связанный блок потока данных отменяет целевой блок

В потоке данных TPL, когда блок связан с другим блоком с распространением, он будет пересылать исключения, а также отмены. Я могу себе представить, что пересылка исключения просто делается с помощью dataFlowBlock.Fault(exception), но мне любопытно, как отмена переадресации, так как нет такой вещи, как dataFlowBlock.Cancel(), Это сделано через то же самое Fault() метод с использованием мимоходом TaskCancelledException в качестве аргумента?

Обновить:

Чтобы уточнить, рассмотрим следующий пример, где только block1 создается с CancelationToken через опции, а block2 нет. Блок1 связан с блоком2 с распространением:

block1 { CancellationToken = ct } -> block2 { }

Когда ct получает запрос отмены, завершение block1 переходит в отмененный. Мой вопрос: что происходит с block2 в этот момент? Блок1 активно отменяет блок2, и если да, то делает ли он это с помощью block.Fault(TaskCanceledException)? Или он использует какой-то внутренний ocus-pocus, который волшебным образом отменяет block2, даже если он был создан без токена отмены?

2 ответа

Решение

Хорошо, я думаю, что мы находимся на одной странице с обновленным постом. Короче говоря, отмена не распространяется на связанные блоки, если распространение не соответствует действительности.

[TestFixture]
public class DataFlowTests
{

    [Test]
    public async Task DataflowTest()
    {
        var cts = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
        var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
        buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = false});

        foreach (var data in Enumerable.Range(0, 20))
        {
            if (data > 10) break;
            await buffer.SendAsync(data);
        }
        cts.Cancel();
        //action.Complete();
        await action.Completion;
        Console.WriteLine(buffer.Completion.Status);
        Console.WriteLine(action.Completion.Status);
    }
}

Этот образец будет зависать вечно в ожидании action завершить. Сейчас звоню Complete() взыскание на ActionBlock<> дает следующие результирующие состояния:

Cancelled - buffer
RanToCompletion - action

Наконец, распространение завершения дает тот же результат без необходимости вручную вызывать завершение для последующих блоков:

[TestFixture]
public class DataFlowTests
{

    [Test]
    public async Task DataflowTest()
    {
        var cts = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
        var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
        buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true});

        foreach (var data in Enumerable.Range(0, 20))
        {
            if (data > 10) break;
            await buffer.SendAsync(data);
        }
        cts.Cancel();
        await action.Completion;
        Console.WriteLine(buffer.Completion.Status);
        Console.WriteLine(action.Completion.Status);
    }
}

Статусы урожайности:

Canceled - buffer
RanToCompletion - action

Обратите внимание, что отмена не приводит к отмене всего конвейера. Другие блоки просто завершены. Теперь, если возникнет исключение, отличное от отмененного уведомления, конвейер будет поврежден через ...Fault(..), В противном случае его стандартное распространение завершается.

Отмена приходит в виде опции, CancellationToken на ExecutionDataflowBlockOptions Переданный токен успешно завершит блоки. Что это означает, что блоки будут лечить OperationCanceledException отличается от других исключений, создаваемых в конвейере, и приводит к тому, что блоки просто завершают после завершения обработки. Отмена предназначена для работы путем совместного использования одного CTS и использования связанных токенов в опциях блока. Затем, когда CTS отменяется, все блоки получают сигнал отмены.

Также больше можно найти здесь от MS.

Указывая на ваш актуальный вопрос в комментариях, это может объяснить вещи немного больше.

Когда блок потока данных отменяется явно, объект AggregateException содержит OperationCanceledException в свойстве InnerExceptions

Также:

TPL предоставляет механизм, который позволяет задачам координировать отмену на основе сотрудничества. Чтобы блоки блоков данных могли участвовать в этом механизме отмены, установите свойство CancellationToken. Когда этот объект CancellationToken установлен в состояние отмены, все блоки потока данных, которые отслеживают этот токен, завершают выполнение своего текущего элемента, но не начинают обработку последующих элементов. Эти блоки потока данных также очищают любые буферизованные сообщения, освобождают соединения с любыми исходными и целевыми блоками и переходят в отмененное состояние. При переходе в отмененное состояние свойство Completion имеет свойство Status, установленное на Cancelled, если исключение не произошло во время обработки. В этом случае Status устанавливается в Failed.

Источник

Это много цитируемый текст, но ссылка на источник предоставлена ​​и объяснения хорошо написаны.

Таким образом, блоки не активно распространяют отмену. Однако, как только отмененный блок завершится, с распространением на true, эта задача завершения будет передана вместе с отмененным или ошибочным состоянием.

Исходный код, который запускается всякий раз, когда блок переносит свое завершение на связанный блок, можно найти здесь или здесь. Ниже представлена ​​упрощенная версия этого кода:

internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target)
{
    AggregateException exception =
        sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;

    if (exception != null) target.Fault(exception); else target.Complete();
}

Распространение отмены не предусмотрено. Если блок завершился в неисправном состоянии, исключение распространяется на связанный блок. В любом другом случае связанный блок просто помечается для завершения.

AFAIK единственный случай, когда блок переходит в отмененное состояние, когда CancellationTokenто, что было поставлено при его создании, будет отменено. Например, если вы попытаетесьthrow new OperationCanceledException() внутри его лямбда-функции ничего не происходит, и обработанное сообщение просто игнорируется.

Другие вопросы по тегам