ForEachAsyc с результатом

Я пытаюсь изменить Стивена Тауба ForEachAsync<T> метод расширения в расширение, которое возвращает результат...

Расширение Стивена:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

Мой подход (не работает; задачи выполняются, но результат неверен)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); //when I "return await" I get good results but only one per partition 
            return default(TResult);
        }));
}

Я знаю, что каким-то образом должен возвращать (когда?) Результаты предыдущей части, но я еще не понял, как это сделать...

Обновление. Результат, который я получаю, - это просто степень градуса непараллелизма, равная нулю (наверное, из-за default(TResult)) хотя все задачи выполняются. Я также пытался return await body(...) и тогда результат был в порядке, но только degreeOfParalleslism количество выполненных задач

2 ответа

Решение

Ваш запрос LINQ может иметь только то же количество результатов, что и количество разделов - вы просто проецируете каждый раздел в один результат.

Если вам не важен порядок, вам просто нужно собрать результаты каждого раздела в список, а затем сгладить их.

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(Я переименовал это из ForEachAsync, как ForEach звучит императивно (подходит для Func<T, Task> в оригинале), тогда как это приносит результаты. foreach цикл не имеет результата - это делает.)

Теперь, когдаAPI стал частью стандартных библиотек (.NET 6), имеет смысл реализовать вариант, возвращающий Task<TResult[]>, на основе этого API. Вот реализация:

      public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    ParallelOptions options,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    var results = new List<TResult>();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    var withIndexes = source.Select((item, index) => (item, index));
    return Parallel.ForEachAsync(withIndexes, options, async (entry, ct) =>
    {
        var (item, index) = entry;
        var result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        if (t.IsCanceled) t.GetAwaiter().GetResult(); // Propagate the correct token
        if (t.IsFaulted)
        {
            var tcs = new TaskCompletionSource<TResult[]>();
            tcs.SetException(t.Exception.InnerExceptions);
            return tcs.Task;
        }
        lock (results) return Task.FromResult(results.ToArray());
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}

Эта реализация поддерживает все опции и функциональные возможности Parallel.ForEachAsyncперегрузка, имеющая IEnumerable<T>в качестве . Его поведение в случае ошибок и отмены идентично. Результаты располагаются в том же порядке, что и связанные элементы в sourceпоследовательность.

Другие вопросы по тегам