Как реализовать эффективный WhenEach, который передает IAsyncEnumerable результатов задачи?
Я пытаюсь обновить свой набор инструментов новыми инструментами, предлагаемыми C# 8, и один метод, который кажется особенно полезным, - это версия Task.WhenAll
который возвращает IAsyncEnumerable
. Этот метод должен передавать результаты задачи, как только они становятся доступными, поэтому назовите егоWhenAll
не имеет особого смысла. WhenEach
звучит более уместно. Сигнатура метода:
public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);
Этот метод можно использовать так:
var tasks = new Task<int>[]
{
ProcessAsync(1, 300),
ProcessAsync(2, 500),
ProcessAsync(3, 400),
ProcessAsync(4, 200),
ProcessAsync(5, 100),
};
await foreach (int result in WhenEach(tasks))
{
Console.WriteLine($"Processed: {result}");
}
static async Task<int> ProcessAsync(int result, int delay)
{
await Task.Delay(delay);
return result;
}
Ожидаемый результат:
Обработано: 5
Обработано: 4
Обработано: 1
Обработано: 3
Обработано: 2
Мне удалось написать базовую реализацию, используя метод Task.WhenAny
в цикле, но с этим подходом есть проблема:
public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
Task<TResult>[] tasks)
{
var hashSet = new HashSet<Task<TResult>>(tasks);
while (hashSet.Count > 0)
{
var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
yield return await task.ConfigureAwait(false);
hashSet.Remove(task);
}
}
Проблема в производительности. Реализация изTask.WhenAny
создает защитную копию предоставленного списка задач, поэтому повторный вызов его в цикле приводит к сложности вычислений O(n²). Моя наивная реализация изо всех сил пытается обработать 10000 задач. На моей машине накладные расходы составляют почти 10 секунд. Я бы хотел, чтобы этот метод был почти таким же производительным, как и встроенныйTask.WhenAll
, который легко справляется с сотнями тысяч задач. Как я мог улучшитьWhenEach
способ заставить его работать прилично?
5 ответов
Используя код из этой статьи, вы можете реализовать следующее:
public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
var results = new Task<Task<T>>[buckets.Length];
for (int i = 0; i < buckets.Length; i++)
{
buckets[i] = new TaskCompletionSource<Task<T>>();
results[i] = buckets[i].Task;
}
int nextTaskIndex = -1;
Action<Task<T>> continuation = completed =>
{
var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
bucket.TrySetResult(completed);
};
foreach (var inputTask in inputTasks)
inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return results;
}
Тогда измените свой WhenEach
позвонить в Interleaved
код
public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
foreach (var bucket in Interleaved(tasks))
{
var t = await bucket;
yield return await t;
}
}
Тогда вы можете позвонить своему WhenEach
по обыкновению
await foreach (int result in WhenEach(tasks))
{
Console.WriteLine($"Processed: {result}");
}
Я провел элементарный тест с 10 тыс. Задач и показал в 5 раз лучшую скорость.
Вы можете использовать канал как асинхронную очередь. Каждая задача может писать в канал по завершении. Элементы в канале будут возвращены как IAsyncEnumerable через ChannelReader.ReadAllAsync.
IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
writer.TryWrite(x.Result)));
_ = Task.WhenAll(continuations)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader.ReadAllAsync();
}
Когда все задачи выполнены writer.Complete()
вызывается закрыть канал.
Чтобы проверить это, этот код создает задачи с уменьшающимися задержками. Это должно вернуть индексы в обратном порядке:
var tasks=Enumerable.Range(1,4)
.Select(async i=>
{
await Task.Delay(300*(5-i));
return i;
});
await foreach(var i in Interleave(tasks))
{
Console.WriteLine(i);
}
Производит:
4
3
2
1
Просто для удовольствия, используя System.Reactive
а также System.Interactive.Async
:
public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
Task<TResult>[] tasks)
=> Observable.Merge(tasks.Select(t => t.ToObservable())).ToAsyncEnumerable()
Я добавляю еще один ответ на этот вопрос, потому что есть пара вопросов, которые необходимо решить.
- Рекомендуется, чтобы методы, создающие асинхронно-перечисляемые последовательности, имели
CancellationToken
параметр. Это позволяетWithCancellation
конфигурация в циклах. - Когда асинхронная операция прикрепляет продолжения к задачам, рекомендуется очищать эти продолжения после завершения операции. Поэтому, если, например, вызывающая сторона метода решает преждевременно выйти из цикла (используя
break
,return
и т.д.), или если цикл завершается преждевременно из-за исключения, мы не хотим оставлять кучу мертвых продолжений, привязанных к задачам. Это может быть особенно важно, если метод вызывается повторно в цикле (как частьRetry
функциональность, например).
Приведенная ниже реализация решает эти две проблемы. Он основан наChannel<Task<TResult>>
. Теперь каналы стали неотъемлемой частью платформы .NET, поэтому нет причин избегать их в пользу более сложных
TaskCompletionSource
решения на основе.
public async static IAsyncEnumerable<TResult> WhenEach<TResult>(
Task<TResult>[] tasks,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
var channel = Channel.CreateUnbounded<Task<TResult>>();
using var completionCts = new CancellationTokenSource();
var continuations = new List<Task>(tasks.Length);
try
{
int pendingCount = tasks.Length;
foreach (var task in tasks)
{
if (task == null) throw new ArgumentException(
$"The tasks argument included a null value.", nameof(tasks));
continuations.Add(task.ContinueWith(t =>
{
bool accepted = channel.Writer.TryWrite(t);
Debug.Assert(accepted);
if (Interlocked.Decrement(ref pendingCount) == 0)
channel.Writer.Complete();
}, completionCts.Token, TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
}
await foreach (var task in channel.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
yield return await task.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
}
finally
{
completionCts.Cancel();
try { await Task.WhenAll(continuations).ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
}
}
The
finally
блок заботится об отмене прикрепленных продолжений и ожидает их завершения перед выходом.
The
ThrowIfCancellationRequested
внутри
await foreach
Цикл может показаться излишним, но на самом деле он необходим из-за особенностей поведения
ReadAllAsync
метод, который объясняется здесь .
Примечание :
OperationCanceledException
в блоке finally подавляется неэффективным
try
/
catch
блокировать. Перехват исключений стоит дорого . Более эффективная реализация подавила бы ошибку, ожидая продолжения со специализированным
SuppressException
ожидающий, как тот, что показан в этом ответе, и специальная обработка
IsCanceled
кейс. Для целей этого ответа исправление этой неэффективности, вероятно, излишне. Маловероятно, что
WhenEach
метод будет когда-либо использоваться в узком цикле.
Мне очень понравилось решение, предоставленное Panagiotis, но я все же хотел, чтобы исключения возникали по мере их возникновения, как в решении JohanP.
Для этого мы можем немного изменить это, чтобы попытаться закрыть канал в продолжениях при сбое задачи:
public IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
if (inputTasks == null)
{
throw new ArgumentNullException(nameof(inputTasks), "Task list must not be null.");
}
var channel = Channel.CreateUnbounded<T>();
var channelWriter = channel.Writer;
var inputTaskContinuations = inputTasks.Select(inputTask => inputTask.ContinueWith(completedInputTask =>
{
// Check whether the task succeeded or not
if (completedInputTask.Status == TaskStatus.RanToCompletion)
{
// Write the result to the channel on successful completion
channelWriter.TryWrite(completedInputTask.Result);
}
else
{
// Complete the channel on failure to immediately communicate the failure to the caller and prevent additional results from being returned
var taskException = completedInputTask.Exception?.InnerException ?? completedInputTask?.Exception;
channelWriter.TryComplete(taskException);
}
}));
// Ensure the writer is closed after the tasks are all complete, and propagate any exceptions from the continuations
_ = Task.WhenAll(inputTaskContinuations).ContinueWith(completedInputTaskContinuationsTask => channelWriter.TryComplete(completedInputTaskContinuationsTask.Exception));
// Return the async enumerator of the channel so results are yielded to the caller as they're available
return channel.Reader.ReadAllAsync();
}
Очевидным недостатком этого является то, что первая обнаруженная ошибка завершит перечисление и предотвратит возврат любых других, возможно успешных, результатов. Это компромисс, приемлемый для моего варианта использования, но может быть неприемлемым для других.