Как заставить IAsyncEnumerable уважать CancellationToken
Изменить: требования этого вопроса изменились. См. Раздел " Обновление " ниже.
У меня есть метод асинхронного итератора, который создает IAsyncEnumerable<int>
(поток чисел), одно число каждые 200 мсек. Вызывающий этот метод использует поток, но хочет остановить перечисление через 1000 мсек. Так чтоCancellationTokenSource
используется, и токен передается в качестве аргумента в WithCancellation
метод расширения. Но жетон не соблюдается. Перечисление продолжается до тех пор, пока не будут израсходованы все числа:
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Выход:
12:55:17.506> 1
12: 55: 17.739> 2
12: 55: 17.941> 3
12: 55: 18.155> 4
12: 55: 18.367> 5
12: 55: 18.570> 6
12: 55: 18.772> 7
12: 55: 18.973> 8
12: 55: 19.174> 9
12: 55: 19.376> 10
Ожидаемый результат - TaskCanceledException
произойти после числа 5. Кажется, я неправильно понял, что WithCancellation
на самом деле делает. Метод просто передает предоставленный токен методу итератора, если этот метод его принимает. В противном случае, как и в случае с методомGetSequence()
в моем примере токен игнорируется. Я полагаю, что решение в моем случае - вручную опросить токен внутри тела перечисления:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Это просто и хорошо работает. Но в любом случае мне интересно, можно ли создать метод расширения, который делает то, что я ожидал,WithCancellation
to do, чтобы запечь токен внутри следующего перечисления. Это подпись нужного метода:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
// Is it possible?
}
Обновление: похоже, что, когда я задал этот вопрос, у меня было неправильное понимание цели всей концепции отмены. Я был под впечатлением, что отмена предназначена для разрыва цикла после того, как ожидают от MoveNextAsync
, а настоящая цель - отменить само ожидание. В моем тривиальном примере ожидание длится всего 200 мсек, но в реальном примере ожидание может быть намного дольше, даже бесконечным. Осознав это, мой вопрос в его текущей форме почти не имеет значения, и я должен либо удалить его и открыть новый с тем же заголовком, либо изменить требования существующего вопроса. Оба варианта так или иначе плохи.
Я решил выбрать второй вариант. Поэтому я не принимаю принятый в настоящее время ответ и прошу новое решение более сложной проблемы - принудительной отмены таким образом, чтобы он вступил в силу немедленно. Другими словами, отмена токена должна привести к завершению асинхронного перечисления за несколько миллисекунд. Приведем практический пример, чтобы различать желаемое и нежелательное поведение:
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
Выход (желательно):
0:00.242> 1
0: 00.467> 2
0: 00.500> Отменено
Вывод (нежелательно):
0:00.242> 1
0: 00.467> 2
0: 00.707> Отменено
GetSequence
это тот же метод, что и в начальном примере, который передает одно число каждые 200 мсек. Этот метод не поддерживает отмену, и предполагается, что мы не можем это изменить.WithEnforcedCancellation
- необходимый метод расширения, который должен решить эту проблему.
3 ответа
Я думаю, важно повторить, что вы не должны этого делать. Всегда лучше сделать так, чтобы асинхронный метод поддерживал токены отмены, тогда все будет работать так, как вы ожидаете.
С учетом сказанного, если вы не можете добавить поддержку отмены в метод async, вы можете обойти его.
Один трюк - использовать
Task.WhenAny
с двумя аргументами:
- задача, которую вы получаете от
IAsyncEnumerator.MoveNextAsync()
- задача, которая завершается, когда запрашивается отмена
Вот краткая версия
// Start the 'await foreach' without the new syntax, because we need access to the Task returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Create a task that completes with the value 'true' when cancellation is requested
var cancellationRequested = CancellationRequested(cancellationToken);
// Let it race against MoveNextAsync() and then check which task "won" the race
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), cancellationRequested))
{
cancellationToken.ThrowIfCancellationRequested();
yield return enumerator.Current;
}
Длинная версия должна работать, если вы запускаете ее локально.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncStreamHelper
{
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
// Start the 'await foreach' without the new syntax, because we need access to the Task returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Create a task that completes with the value 'true' when cancellation is requested
var cancellationRequested = CancellationRequested(cancellationToken);
// Let it race against MoveNextAsync() and then check which task "won" the race
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), cancellationRequested))
{
cancellationToken.ThrowIfCancellationRequested();
yield return enumerator.Current;
}
}
private static Task<bool> CancellationRequested(CancellationToken cancellationToken)
{
// This is just one possible implementation... feel free to swap out for something else
return Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ContinueWith(_ =>
{
return true;
}, TaskContinuationOptions.OnlyOnCanceled);
}
}
public class Program
{
public static async Task Main()
{
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
}
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
}
Предостережение
MoveNextAsync()
возвращает
ValueTask<bool>
который нельзя использовать с
Task.WhenAny()
если ты не позвонишь
AsTask()
... что имеет свои собственные последствия для производительности.
IAsyncEnumerable
явно предусматривает этот механизм с EnumeratorCancellation
атрибут:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
for (int i = 1; i <= 10; i++) {
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
}
}
Фактически, компилятор достаточно полезен, чтобы выдать предупреждение, если вы дадите методу CancellationToken
параметр, но не добавляйте атрибут.
Обратите внимание, что токен передан в .WithCancellation
переопределит любой локальный токен, переданный методу. В спецификации есть подробности по этому вопросу.
Конечно, это будет работать только в том случае, если перечисление действительно принимает CancellationToken
- но тот факт, что отмена действительно работает, только если все сделано совместно, верен для любого async
Работа. Ответ Йельдара хорош для "принуждения" некоторой меры отмены к перечислимому, которое ее не поддерживает, но предпочтительным решением должно быть изменение перечисления для поддержки отмены самостоятельно - компилятор делает все, чтобы вам помочь.
Вы можете просто извлечь свою логику в метод расширения следующим образом:
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
await foreach (var item in source)
{
cancellationToken.ThrowIfCancellationRequested();
yield return item;
}
}