ForEachAsyc с результатом
Я пытаюсь изменить Стивена Тауба ForEachAsync<T>
метод расширения в расширение, которое возвращает результат...
Расширение Стивена:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Мой подход (не работает; задачи выполняются, но результат неверен)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); //when I "return await" I get good results but only one per partition
return default(TResult);
}));
}
Я знаю, что каким-то образом должен возвращать (когда?) Результаты предыдущей части, но я еще не понял, как это сделать...
Обновление. Результат, который я получаю, - это просто степень градуса непараллелизма, равная нулю (наверное, из-за default(TResult)
) хотя все задачи выполняются. Я также пытался return await body(...)
и тогда результат был в порядке, но только degreeOfParalleslism
количество выполненных задач
2 ответа
Ваш запрос LINQ может иметь только то же количество результатов, что и количество разделов - вы просто проецируете каждый раздел в один результат.
Если вам не важен порядок, вам просто нужно собрать результаты каждого раздела в список, а затем сгладить их.
public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
var lists = await Task.WhenAll<List<TResult>>(
Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
.Select(partition => Task.Run<List<TResult>>(async () =>
{
var list = new List<TResult>();
using (partition)
{
while (partition.MoveNext())
{
list.Add(await body(partition.Current));
}
}
return list;
})));
return lists.SelectMany(list => list).ToArray();
}
(Я переименовал это из ForEachAsync
, как ForEach
звучит императивно (подходит для Func<T, Task>
в оригинале), тогда как это приносит результаты. foreach
цикл не имеет результата - это делает.)
Теперь, когдаAPI стал частью стандартных библиотек (.NET 6), имеет смысл реализовать вариант, возвращающий
Task<TResult[]>
, на основе этого API. Вот реализация:
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
this IEnumerable<TSource> source,
ParallelOptions options,
Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
var results = new List<TResult>();
if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
var withIndexes = source.Select((item, index) => (item, index));
return Parallel.ForEachAsync(withIndexes, options, async (entry, ct) =>
{
var (item, index) = entry;
var result = await body(item, ct).ConfigureAwait(false);
lock (results)
{
while (results.Count <= index) results.Add(default);
results[index] = result;
}
}).ContinueWith(t =>
{
if (t.IsCanceled) t.GetAwaiter().GetResult(); // Propagate the correct token
if (t.IsFaulted)
{
var tcs = new TaskCompletionSource<TResult[]>();
tcs.SetException(t.Exception.InnerExceptions);
return tcs.Task;
}
lock (results) return Task.FromResult(results.ToArray());
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}
Эта реализация поддерживает все опции и функциональные возможности
Parallel.ForEachAsync
перегрузка, имеющая
IEnumerable<T>
в качестве . Его поведение в случае ошибок и отмены идентично. Результаты располагаются в том же порядке, что и связанные элементы в
source
последовательность.