Как заставить IAsyncEnumerable уважать CancellationToken

Изменить: требования этого вопроса изменились. См. Раздел " Обновление " ниже.

У меня есть метод асинхронного итератора, который создает IAsyncEnumerable<int>(поток чисел), одно число каждые 200 мсек. Вызывающий этот метод использует поток, но хочет остановить перечисление через 1000 мсек. Так чтоCancellationTokenSource используется, и токен передается в качестве аргумента в WithCancellation метод расширения. Но жетон не соблюдается. Перечисление продолжается до тех пор, пока не будут израсходованы все числа:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Выход:

12:55:17.506> 1
12: 55: 17.739> 2
12: 55: 17.941> 3
12: 55: 18.155> 4
12: 55: 18.367> 5
12: 55: 18.570> 6
12: 55: 18.772> 7
12: 55: 18.973> 8
12: 55: 19.174> 9
12: 55: 19.376> 10

Ожидаемый результат - TaskCanceledException произойти после числа 5. Кажется, я неправильно понял, что WithCancellationна самом деле делает. Метод просто передает предоставленный токен методу итератора, если этот метод его принимает. В противном случае, как и в случае с методомGetSequence()в моем примере токен игнорируется. Я полагаю, что решение в моем случае - вручную опросить токен внутри тела перечисления:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Это просто и хорошо работает. Но в любом случае мне интересно, можно ли создать метод расширения, который делает то, что я ожидал,WithCancellationto do, чтобы запечь токен внутри следующего перечисления. Это подпись нужного метода:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

Обновление: похоже, что, когда я задал этот вопрос, у меня было неправильное понимание цели всей концепции отмены. Я был под впечатлением, что отмена предназначена для разрыва цикла после того, как ожидают от MoveNextAsync, а настоящая цель - отменить само ожидание. В моем тривиальном примере ожидание длится всего 200 мсек, но в реальном примере ожидание может быть намного дольше, даже бесконечным. Осознав это, мой вопрос в его текущей форме почти не имеет значения, и я должен либо удалить его и открыть новый с тем же заголовком, либо изменить требования существующего вопроса. Оба варианта так или иначе плохи.

Я решил выбрать второй вариант. Поэтому я не принимаю принятый в настоящее время ответ и прошу новое решение более сложной проблемы - принудительной отмены таким образом, чтобы он вступил в силу немедленно. Другими словами, отмена токена должна привести к завершению асинхронного перечисления за несколько миллисекунд. Приведем практический пример, чтобы различать желаемое и нежелательное поведение:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

Выход (желательно):

0:00.242> 1
0: 00.467> 2
0: 00.500> Отменено

Вывод (нежелательно):

0:00.242> 1
0: 00.467> 2
0: 00.707> Отменено

GetSequenceэто тот же метод, что и в начальном примере, который передает одно число каждые 200 мсек. Этот метод не поддерживает отмену, и предполагается, что мы не можем это изменить.WithEnforcedCancellation - необходимый метод расширения, который должен решить эту проблему.

3 ответа

Решение

Я думаю, важно повторить, что вы не должны этого делать. Всегда лучше сделать так, чтобы асинхронный метод поддерживал токены отмены, тогда все будет работать так, как вы ожидаете.

С учетом сказанного, если вы не можете добавить поддержку отмены в метод async, вы можете обойти его.

Один трюк - использовать Task.WhenAny с двумя аргументами:

  1. задача, которую вы получаете от IAsyncEnumerator.MoveNextAsync()
  2. задача, которая завершается, когда запрашивается отмена

Вот краткая версия

      // Start the 'await foreach' without the new syntax, because we need access to the Task returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Create a task that completes with the value 'true' when cancellation is requested
var cancellationRequested = CancellationRequested(cancellationToken);

// Let it race against MoveNextAsync() and then check which task "won" the race
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), cancellationRequested))
{
    cancellationToken.ThrowIfCancellationRequested();
    yield return enumerator.Current;
}

Длинная версия должна работать, если вы запускаете ее локально.

      using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax, because we need access to the Task returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);

        // Create a task that completes with the value 'true' when cancellation is requested
        var cancellationRequested = CancellationRequested(cancellationToken);
        
        // Let it race against MoveNextAsync() and then check which task "won" the race
        while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), cancellationRequested))
        {
            cancellationToken.ThrowIfCancellationRequested();
            yield return enumerator.Current;
        }
    }

    private static Task<bool> CancellationRequested(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ContinueWith(_ =>
        {
            return true;
        }, TaskContinuationOptions.OnlyOnCanceled);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

Предостережение

MoveNextAsync() возвращает ValueTask<bool> который нельзя использовать с Task.WhenAny() если ты не позвонишь AsTask()... что имеет свои собственные последствия для производительности.

IAsyncEnumerable явно предусматривает этот механизм с EnumeratorCancellation атрибут:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

Фактически, компилятор достаточно полезен, чтобы выдать предупреждение, если вы дадите методу CancellationToken параметр, но не добавляйте атрибут.

Обратите внимание, что токен передан в .WithCancellationпереопределит любой локальный токен, переданный методу. В спецификации есть подробности по этому вопросу.

Конечно, это будет работать только в том случае, если перечисление действительно принимает CancellationToken - но тот факт, что отмена действительно работает, только если все сделано совместно, верен для любого asyncРабота. Ответ Йельдара хорош для "принуждения" некоторой меры отмены к перечислимому, которое ее не поддерживает, но предпочтительным решением должно быть изменение перечисления для поддержки отмены самостоятельно - компилятор делает все, чтобы вам помочь.

Вы можете просто извлечь свою логику в метод расширения следующим образом:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}
Другие вопросы по тегам