Как остановить распространение асинхронного потока (IAsyncEnumerable)
У меня есть метод, который принимает IAsyncEnumerable
в качестве аргумента, а также возвращает IAsyncEnumerable
. Он вызывает веб-метод для каждого элемента во входном потоке и передает результат в выходной поток. Мой вопрос: как я могу получить уведомление, если вызывающий мой метод прекратил перечисление выходного потока, чтобы я мог прекратить перечисление входного потока внутри моего метода? Похоже, я должен получать уведомления, потому что вызывающий абонент по умолчанию использует IAsyncEnumerator
это вытекает из моего метода. Есть ли какой-либо встроенный механизм, который генерирует такое уведомление для асинхронных методов, сгенерированных компилятором? Если нет, то какую альтернативу проще всего реализовать?
Пример. Веб-метод проверяет, действителен ли URL-адрес. Предоставляется бесконечный поток URL-адресов, но вызывающий перестает перечислять результаты, когда обнаруживается более двух недопустимых URL-адресов:
var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
Console.WriteLine($"Url {result.Url} is "
+ (result.IsValid ? "OK" : "Invalid!"));
if (!result.IsValid) invalidCount++;
if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);
Генератор URL-адресов. Один URL-адрес создается каждые 300 мсек.
private static async IAsyncEnumerable<string> GetMockUrls()
{
int index = 0;
while (true)
{
await Task.Delay(300);
yield return $"https://mock.com/{++index:0000}";
}
}
Валидатор URL-адресов. Существует требование, чтобы входной поток был тщательно пронумерован, чтобы два асинхронных рабочих процесса выполнялись параллельно. Первый рабочий процесс вставляет URL-адреса в очередь, а второй рабочий процесс выбирает URL-адреса один за другим и проверяет их. А BufferBlock
используется как асинхронная очередь.
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
Уточнение: очередь является обязательной, и ее удаление не вариант. Это важный компонент этой проблемы.
Валидатор одиночного URL. В среднем процесс проверки длится 300 мсек.
private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
await Task.Delay(_random.Next(100, 600));
return _random.Next(0, 2) != 0;
}
Выход:
Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...
Проблема в том, что URL-адреса по-прежнему создаются и принимаются после того, как вызывающий / клиент завершил асинхронное перечисление. Я бы хотел это исправить, чтобы сообщения в консоли больше не появлялись после--Async enumeration finished--
.
2 ответа
редактировать
Обсуждение будет легче с подходящим примером. Проверка URL-адресов не так уж и дорога. Что, если вам нужно нажать, например, 100 URL-адресов и выбрать первые 3 ответа?
В этом случае имеют смысл и рабочий, и буфер.
Редактировать 2
Один из комментариев добавляет дополнительную сложность - задачи выполняются одновременно, а результаты должны выдаваться по мере их поступления.
Для начинающих, ValidateUrl
можно переписать как метод итератора:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
var isValid=await MockValidateUrl(url);
yield return (url, isValid);
}
}
В рабочей задаче нет необходимости, поскольку все методы асинхронны. Метод итератора не будет выполняться, пока потребитель не запросит результат. Даже еслиMockValidateUrl
делает что-то дорогое, он может использовать Task.Run
сам или завернуться в Task.Run
. Однако это создало бы довольно много задач.
Для полноты картины можно добавить CancellationToken
а также ConfigureAwait(false)
:
public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,
[EnumeratorCancellation]CancellationToken token=default)
{
await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
{
var isValid=await MockValidateUrl(url).ConfigureAwait(false);
yield return (url,isValid);
}
}
В любом случае, как только вызывающий перестает повторять, ValidateUrls
остановится.
Буферизация
Буферизация - это проблема: как бы она ни была запрограммирована, воркер не остановится, пока буфер не заполнится. Размер буфера - это количество итераций, которые исполнитель должен выполнить, прежде чем поймет, что ему нужно остановиться. Это отличный случай для канала (да, еще раз!):
public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.WithCancellation(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader.ReadAllAsync(token);
}
Однако лучше передавать ChannelReaders вместо IAsyncEnumerables. По крайней мере, асинхронный перечислитель не создается, пока кто-нибудь не попытается прочитать из ChannelReader. Также проще создавать конвейеры как методы расширения:
public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.ReadAllAsync(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
Этот синтаксис позволяет плавно создавать конвейеры. Допустим, у нас есть этот вспомогательный метод для преобразования IEnumerables в каналы (или IAsyncEnumerables):
public static ChannelReader<T> AsChannel(
IEnumerable<T> items)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
foreach(var item in items)
{
channel.TryWrite(item);
}
return channel.Reader;
}
Мы можем написать:
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls();
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Use the items here
}
Параллельные вызовы с немедленным распространением
С каналами это легко сделать, хотя в это время исполнителю нужно запустить все задачи сразу. По сути, нам нужно несколько рабочих. Это не то, что можно сделать с помощью только IAsyncEnumerable.
Прежде всего, если бы мы хотели использовать, например, 5 одновременных задач для обработки входных данных, мы могли бы написать
var tasks = Enumerable.Range(0,5).
.Select(_ => Task.Run(async ()=>{
///
},token));
_ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));
вместо того:
_ = Task.Run(async ()=>{
///
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
Достаточно использовать большое количество рабочих. Я не уверен, может ли IAsyncEnumerable использоваться несколькими воркерами, и я действительно не хочу узнавать.
Преждевременная отмена
Все вышеперечисленное работает, если клиент потребляет все результаты. Чтобы остановить обработку, например, после первых 5 результатов, нам понадобится CancellationToken:
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls(cts.Token);
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
Сам этот код можно извлечь в методе, который получает ChannelReader и, в данном случае, CancellationTokenSource:
static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
{
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
}
И конвейер становится:
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel()
.ValidateUrls(cts.Token)
.LastStep(cts);
Думаю, мне следует ответить на свой вопрос, поскольку теперь у меня есть достаточно простое универсальное решение.
Обновление: я очищаю свой предыдущий ответ, потому что обнаружил гораздо более простое решение. На самом деле это до стыда просто. Все, что мне нужно сделать, это заключить податливую частьValidateUrls
итератор в try-finally
блок. Вfinally
блок будет выполняться в каждом случае, либо вызывающая сторона обычно завершает перечисление, либо ненормально break
или исключение. Вот как я могу получить уведомление, которое ищу, отменивCancellationTokenSource
на finally
:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
var completionCTS = new CancellationTokenSource();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
if (completionCTS.IsCancellationRequested) break;
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
try
{
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
finally // This runs when the caller completes the enumeration
{
completionCTS.Cancel();
}
}
Вероятно, я должен отметить, что асинхронный итератор, не поддерживающий отмену, не является хорошей практикой. Без этого у вызывающего абонента нет простого способа остановить ожидание между потреблением одного значения и следующего. Итак, лучшая подпись для моего метода должна быть:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
Затем токен может быть передан ожидаемым методам производящего цикла, OutputAvailableAsync
и MockValidateUrl
.
С точки зрения вызывающего абонента токен может быть передан напрямую или путем связывания метода расширения. WithCancellation
.
await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))