Как реализовать эффективный WhenEach, который передает IAsyncEnumerable результатов задачи?

Я пытаюсь обновить свой набор инструментов новыми инструментами, предлагаемыми C# 8, и один метод, который кажется особенно полезным, - это версия Task.WhenAll который возвращает IAsyncEnumerable. Этот метод должен передавать результаты задачи, как только они становятся доступными, поэтому назовите егоWhenAll не имеет особого смысла. WhenEachзвучит более уместно. Сигнатура метода:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

Этот метод можно использовать так:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

Ожидаемый результат:

Обработано: 5
Обработано: 4
Обработано: 1
Обработано: 3
Обработано: 2

Мне удалось написать базовую реализацию, используя метод Task.WhenAny в цикле, но с этим подходом есть проблема:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

Проблема в производительности. Реализация изTask.WhenAnyсоздает защитную копию предоставленного списка задач, поэтому повторный вызов его в цикле приводит к сложности вычислений O(n²). Моя наивная реализация изо всех сил пытается обработать 10000 задач. На моей машине накладные расходы составляют почти 10 секунд. Я бы хотел, чтобы этот метод был почти таким же производительным, как и встроенныйTask.WhenAll, который легко справляется с сотнями тысяч задач. Как я мог улучшитьWhenEach способ заставить его работать прилично?

5 ответов

Решение

Используя код из этой статьи, вы можете реализовать следующее:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

Тогда измените свой WhenEach позвонить в Interleaved код

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

Тогда вы можете позвонить своему WhenEach по обыкновению

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

Я провел элементарный тест с 10 тыс. Задач и показал в 5 раз лучшую скорость.

Вы можете использовать канал как асинхронную очередь. Каждая задача может писать в канал по завершении. Элементы в канале будут возвращены как IAsyncEnumerable через ChannelReader.ReadAllAsync.

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();
}

Когда все задачи выполнены writer.Complete() вызывается закрыть канал.

Чтобы проверить это, этот код создает задачи с уменьшающимися задержками. Это должно вернуть индексы в обратном порядке:

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                    { 
                      await Task.Delay(300*(5-i));
                      return i;
                    });

await foreach(var i in Interleave(tasks))
{
     Console.WriteLine(i);

}

Производит:

4
3
2
1

Просто для удовольствия, используя System.Reactive а также System.Interactive.Async:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
    => Observable.Merge(tasks.Select(t => t.ToObservable())).ToAsyncEnumerable()

Я добавляю еще один ответ на этот вопрос, потому что есть пара вопросов, которые необходимо решить.

  1. Рекомендуется, чтобы методы, создающие асинхронно-перечисляемые последовательности, имели CancellationTokenпараметр. Это позволяетWithCancellationконфигурация в циклах.
  2. Когда асинхронная операция прикрепляет продолжения к задачам, рекомендуется очищать эти продолжения после завершения операции. Поэтому, если, например, вызывающая сторона метода решает преждевременно выйти из цикла (используя break, returnи т.д.), или если цикл завершается преждевременно из-за исключения, мы не хотим оставлять кучу мертвых продолжений, привязанных к задачам. Это может быть особенно важно, если метод вызывается повторно в цикле (как часть Retryфункциональность, например).

Приведенная ниже реализация решает эти две проблемы. Он основан наChannel<Task<TResult>>. Теперь каналы стали неотъемлемой частью платформы .NET, поэтому нет причин избегать их в пользу более сложных TaskCompletionSourceрешения на основе.

      public async static IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    var channel = Channel.CreateUnbounded<Task<TResult>>();
    using var completionCts = new CancellationTokenSource();
    var continuations = new List<Task>(tasks.Length);
    try
    {
        int pendingCount = tasks.Length;
        foreach (var task in tasks)
        {
            if (task == null) throw new ArgumentException(
                $"The tasks argument included a null value.", nameof(tasks));
            continuations.Add(task.ContinueWith(t =>
            {
                bool accepted = channel.Writer.TryWrite(t);
                Debug.Assert(accepted);
                if (Interlocked.Decrement(ref pendingCount) == 0)
                    channel.Writer.Complete();
            }, completionCts.Token, TaskContinuationOptions.ExecuteSynchronously |
                TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
        }

        await foreach (var task in channel.Reader.ReadAllAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            yield return await task.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    finally
    {
        completionCts.Cancel();
        try { await Task.WhenAll(continuations).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
    }
}

The finallyблок заботится об отмене прикрепленных продолжений и ожидает их завершения перед выходом.

The ThrowIfCancellationRequestedвнутри await foreachЦикл может показаться излишним, но на самом деле он необходим из-за особенностей поведения ReadAllAsyncметод, который объясняется здесь .


Примечание : OperationCanceledExceptionв блоке finally подавляется неэффективным try/ catchблокировать. Перехват исключений стоит дорого . Более эффективная реализация подавила бы ошибку, ожидая продолжения со специализированным SuppressExceptionожидающий, как тот, что показан в этом ответе, и специальная обработка IsCanceledкейс. Для целей этого ответа исправление этой неэффективности, вероятно, излишне. Маловероятно, что WhenEachметод будет когда-либо использоваться в узком цикле.

Мне очень понравилось решение, предоставленное Panagiotis, но я все же хотел, чтобы исключения возникали по мере их возникновения, как в решении JohanP.

Для этого мы можем немного изменить это, чтобы попытаться закрыть канал в продолжениях при сбое задачи:

public IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    if (inputTasks == null)
    {
        throw new ArgumentNullException(nameof(inputTasks), "Task list must not be null.");
    }

    var channel = Channel.CreateUnbounded<T>();
    var channelWriter = channel.Writer;
    var inputTaskContinuations = inputTasks.Select(inputTask => inputTask.ContinueWith(completedInputTask =>
    {
        // Check whether the task succeeded or not
        if (completedInputTask.Status == TaskStatus.RanToCompletion)
        {
            // Write the result to the channel on successful completion
            channelWriter.TryWrite(completedInputTask.Result);
        }
        else
        {
            // Complete the channel on failure to immediately communicate the failure to the caller and prevent additional results from being returned
            var taskException = completedInputTask.Exception?.InnerException ?? completedInputTask?.Exception;
            channelWriter.TryComplete(taskException);
        }
    }));

    // Ensure the writer is closed after the tasks are all complete, and propagate any exceptions from the continuations
    _ = Task.WhenAll(inputTaskContinuations).ContinueWith(completedInputTaskContinuationsTask => channelWriter.TryComplete(completedInputTaskContinuationsTask.Exception));

    // Return the async enumerator of the channel so results are yielded to the caller as they're available
    return channel.Reader.ReadAllAsync();
}

Очевидным недостатком этого является то, что первая обнаруженная ошибка завершит перечисление и предотвратит возврат любых других, возможно успешных, результатов. Это компромисс, приемлемый для моего варианта использования, но может быть неприемлемым для других.

Другие вопросы по тегам