Как остановить распространение асинхронного потока (IAsyncEnumerable)

У меня есть метод, который принимает IAsyncEnumerable в качестве аргумента, а также возвращает IAsyncEnumerable. Он вызывает веб-метод для каждого элемента во входном потоке и передает результат в выходной поток. Мой вопрос: как я могу получить уведомление, если вызывающий мой метод прекратил перечисление выходного потока, чтобы я мог прекратить перечисление входного потока внутри моего метода? Похоже, я должен получать уведомления, потому что вызывающий абонент по умолчанию использует IAsyncEnumerator это вытекает из моего метода. Есть ли какой-либо встроенный механизм, который генерирует такое уведомление для асинхронных методов, сгенерированных компилятором? Если нет, то какую альтернативу проще всего реализовать?

Пример. Веб-метод проверяет, действителен ли URL-адрес. Предоставляется бесконечный поток URL-адресов, но вызывающий перестает перечислять результаты, когда обнаруживается более двух недопустимых URL-адресов:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

Генератор URL-адресов. Один URL-адрес создается каждые 300 мсек.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

Валидатор URL-адресов. Существует требование, чтобы входной поток был тщательно пронумерован, чтобы два асинхронных рабочих процесса выполнялись параллельно. Первый рабочий процесс вставляет URL-адреса в очередь, а второй рабочий процесс выбирает URL-адреса один за другим и проверяет их. А BufferBlock используется как асинхронная очередь.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Уточнение: очередь является обязательной, и ее удаление не вариант. Это важный компонент этой проблемы.

Валидатор одиночного URL. В среднем процесс проверки длится 300 мсек.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Выход:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

Проблема в том, что URL-адреса по-прежнему создаются и принимаются после того, как вызывающий / клиент завершил асинхронное перечисление. Я бы хотел это исправить, чтобы сообщения в консоли больше не появлялись после--Async enumeration finished--.

2 ответа

редактировать

Обсуждение будет легче с подходящим примером. Проверка URL-адресов не так уж и дорога. Что, если вам нужно нажать, например, 100 URL-адресов и выбрать первые 3 ответа?

В этом случае имеют смысл и рабочий, и буфер.

Редактировать 2

Один из комментариев добавляет дополнительную сложность - задачи выполняются одновременно, а результаты должны выдаваться по мере их поступления.


Для начинающих, ValidateUrl можно переписать как метод итератора:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    await foreach (var url in urls)
    {
        Console.WriteLine($"Url {url} received");
        var isValid=await MockValidateUrl(url);
        yield return (url, isValid);
    }
}

В рабочей задаче нет необходимости, поскольку все методы асинхронны. Метод итератора не будет выполняться, пока потребитель не запросит результат. Даже еслиMockValidateUrl делает что-то дорогое, он может использовать Task.Run сам или завернуться в Task.Run. Однако это создало бы довольно много задач.

Для полноты картины можно добавить CancellationToken а также ConfigureAwait(false):

public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
       IAsyncEnumerable<string> urls, 
       [EnumeratorCancellation]CancellationToken token=default)
{
    await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
    {
        var isValid=await MockValidateUrl(url).ConfigureAwait(false);
        yield return (url,isValid);
    }
}

В любом случае, как только вызывающий перестает повторять, ValidateUrls остановится.

Буферизация

Буферизация - это проблема: как бы она ни была запрограммирована, воркер не остановится, пока буфер не заполнится. Размер буфера - это количество итераций, которые исполнитель должен выполнить, прежде чем поймет, что ему нужно остановиться. Это отличный случай для канала (да, еще раз!):

public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        IAsyncEnumerable<string> urls,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.WithCancellation(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader.ReadAllAsync(token);
}

Однако лучше передавать ChannelReaders вместо IAsyncEnumerables. По крайней мере, асинхронный перечислитель не создается, пока кто-нибудь не попытается прочитать из ChannelReader. Также проще создавать конвейеры как методы расширения:

public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
        this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.ReadAllAsync(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader;
}

Этот синтаксис позволяет плавно создавать конвейеры. Допустим, у нас есть этот вспомогательный метод для преобразования IEnumerables в каналы (или IAsyncEnumerables):

public static ChannelReader<T> AsChannel(
         IEnumerable<T> items)
{
    var channel=Channel.CreateUnbounded();        
    var writer=channel.Writer;
    foreach(var item in items)
    {
        channel.TryWrite(item);
    }
    return channel.Reader;
}

Мы можем написать:

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls();

await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
   //Use the items here
}

Параллельные вызовы с немедленным распространением

С каналами это легко сделать, хотя в это время исполнителю нужно запустить все задачи сразу. По сути, нам нужно несколько рабочих. Это не то, что можно сделать с помощью только IAsyncEnumerable.

Прежде всего, если бы мы хотели использовать, например, 5 одновременных задач для обработки входных данных, мы могли бы написать

    var tasks = Enumerable.Range(0,5).
                  .Select(_ => Task.Run(async ()=>{
                                 /// 
                             },token));
    _ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));        

вместо того:

    _ = Task.Run(async ()=>{
        /// 
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        

Достаточно использовать большое количество рабочих. Я не уверен, может ли IAsyncEnumerable использоваться несколькими воркерами, и я действительно не хочу узнавать.

Преждевременная отмена

Все вышеперечисленное работает, если клиент потребляет все результаты. Чтобы остановить обработку, например, после первых 5 результатов, нам понадобится CancellationToken:

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls(cts.Token);

int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
    //Break after 3 iterations
    if(i++>2)
    {
        break;
    }
    ....
}

cts.Cancel();

Сам этот код можно извлечь в методе, который получает ChannelReader и, в данном случае, CancellationTokenSource:

static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
    {
    int i=0;
    await foreach(var (url,isValid) in pipeline.ReadAllAsync())
    {
        //Break after 3 iterations
        if(i++>2)
        {
            break;
        }
        ....
    }

    cts.Cancel();        
}

И конвейер становится:

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     
                    .ValidateUrls(cts.Token)
                    .LastStep(cts);

Думаю, мне следует ответить на свой вопрос, поскольку теперь у меня есть достаточно простое универсальное решение.

Обновление: я очищаю свой предыдущий ответ, потому что обнаружил гораздо более простое решение. На самом деле это до стыда просто. Все, что мне нужно сделать, это заключить податливую частьValidateUrls итератор в try-finallyблок. Вfinally блок будет выполняться в каждом случае, либо вызывающая сторона обычно завершает перечисление, либо ненормально breakили исключение. Вот как я могу получить уведомление, которое ищу, отменивCancellationTokenSource на finally:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    var completionCTS = new CancellationTokenSource();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            if (completionCTS.IsCancellationRequested) break;
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    try
    {
        while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
        {
            yield return (url, await MockValidateUrl(url));
        }
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

Вероятно, я должен отметить, что асинхронный итератор, не поддерживающий отмену, не является хорошей практикой. Без этого у вызывающего абонента нет простого способа остановить ожидание между потреблением одного значения и следующего. Итак, лучшая подпись для моего метода должна быть:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{

Затем токен может быть передан ожидаемым методам производящего цикла, OutputAvailableAsync и MockValidateUrl.

С точки зрения вызывающего абонента токен может быть передан напрямую или путем связывания метода расширения. WithCancellation.

await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))
Другие вопросы по тегам