Вложенности ждут в Parallel.ForEach
В приложении метро мне нужно выполнить несколько вызовов WCF. Необходимо выполнить значительное количество вызовов, поэтому мне нужно выполнять их в параллельном цикле. Проблема заключается в том, что параллельный цикл завершается до завершения всех вызовов WCF.
Как бы вы реорганизовали это, чтобы работать так, как ожидалось?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
11 ответов
Вся идея позади Parallel.ForEach()
является то, что у вас есть набор потоков, и каждый поток обрабатывает часть коллекции. Как вы заметили, это не работает с async
-await
где вы хотите освободить поток на время асинхронного вызова.
Вы можете "исправить" это, заблокировав ForEach()
темы, но это побеждает весь смысл async
-await
,
Что вы можете сделать, это использовать поток данных TPL вместо Parallel.ForEach()
, который поддерживает асинхронный Task
набухать.
В частности, ваш код может быть написан с использованием TransformBlock
который превращает каждый идентификатор в Customer
с использованием async
лямбда. Этот блок может быть настроен для параллельного выполнения. Вы бы связали этот блок с ActionBlock
что пишет каждый Customer
на консоль. После настройки блочной сети вы можете Post()
каждый идентификатор TransformBlock
,
В коде:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Хотя вы, вероятно, хотите ограничить параллелизм TransformBlock
до некоторой маленькой константы. Кроме того, вы можете ограничить емкость TransformBlock
и добавлять элементы к нему асинхронно, используя SendAsync()
Например, если коллекция слишком большая.
Дополнительным преимуществом по сравнению с вашим кодом (если он работал) является то, что запись начнется, как только закончится отдельный элемент, а не будет ждать завершения всей обработки.
Ответ Свика (как обычно) превосходен.
Тем не менее, я считаю Dataflow более полезным, когда у вас есть большие объемы данных для передачи. Или когда вам нужно async
-совместимая очередь
В вашем случае более простое решение - просто использовать async
Стиль параллелизма:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Использование DataFlow, как предлагает svick, может быть излишним, и ответ Стивена не предоставляет средств для контроля параллелизма операции. Однако это может быть достигнуто довольно просто:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
ToArray()
вызовы можно оптимизировать, используя массив вместо списка и заменяя выполненные задачи, но я сомневаюсь, что это будет иметь большое значение в большинстве сценариев. Пример использования по вопросу ОП:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
Пользователь EDIT Fellow SO и мастер TPL Eli Arbel указали мне на соответствующую статью от Стивена Туба. Как обычно, его реализация элегантна и эффективна:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
Вы можете сэкономить усилия с новым пакетом AsyncEnumerator NuGet, который не существовал 4 года назад, когда вопрос был первоначально опубликован. Это позволяет контролировать степень параллелизма:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Отказ от ответственности: я являюсь автором библиотеки AsyncEnumerator, которая имеет открытый исходный код и лицензируется в рамках MIT, и я публикую это сообщение просто для того, чтобы помочь сообществу.
Обернуть Parallel.Foreach
в Task.Run()
и вместо await
использование ключевых слов [yourasyncmethod].Result
(вам нужно выполнить задачу Task.Run, чтобы не блокировать поток пользовательского интерфейса)
Что-то вроде этого:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
Метод расширения для этого, который использует SemaphoreSlim, а также позволяет установить максимальную степень параллелизма
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Пример использования:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Это должно быть довольно эффективно и проще, чем заставить работать весь поток данных TPL:
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
Я немного опоздал на вечеринку, но вы можете рассмотреть возможность использования GetAwaiter.GetResult() для запуска вашего асинхронного кода в контексте синхронизации, но в параллельном порядке, как показано ниже;
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
После введения нескольких вспомогательных методов вы сможете запускать параллельные запросы с помощью этого простого синтаксиса:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
Что здесь происходит, мы разбиваем исходную коллекцию на 10 частей (.Split(DegreeOfParallelism)
), затем выполните 10 задач, каждая из которых обрабатывает свои элементы одну за другой (.SelectManyAsync(...)
) и объединить их обратно в один список.
Стоит упомянуть, что есть более простой подход:
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
Но для этого нужна мера предосторожности: если у вас есть слишком большая исходная коллекция, она запишет Task
для каждого элемента сразу, что может привести к значительному снижению производительности.
Методы расширения, используемые в приведенных выше примерах, выглядят следующим образом:
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
Вот простая универсальная реализация метода, основанная на из библиотеки TPL Dataflow , теперь встроенной в платформу .NET 5:
public static Task ForEachAsync<T>(this IEnumerable<T> source,
Func<T, Task> action, int dop)
{
// Arguments validation omitted
var block = new ActionBlock<T>(action,
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = dop });
try
{
foreach (var item in source) block.Post(item);
block.Complete();
}
catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
return block.Completion;
}
Это решение с нетерпением перечисляет поставляемые
IEnumerable
, и немедленно отправляет все его элементы в. Так что это не очень подходит для перечисляемых с огромным количеством элементов. Ниже представлен более сложный подход, который лениво перечисляет источник и отправляет его элементы в
ActionBlock
по одному:
public static async Task ForEachAsync<T>(this IEnumerable<T> source,
Func<T, Task> action, int dop)
{
// Arguments validation omitted
var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions()
{ MaxDegreeOfParallelism = dop, BoundedCapacity = dop });
try
{
foreach (var item in source)
if (!await block.SendAsync(item).ConfigureAwait(false)) break;
block.Complete();
}
catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
try { await block.Completion.ConfigureAwait(false); }
catch { block.Completion.Wait(); } // Propagate AggregateException
}
Эти два метода имеют разное поведение в случае исключений. Первый распространяет объект, содержащий исключения, прямо в его
InnerExceptions
свойство. Второй распространяет объект, содержащий еще один
AggregateException
за исключениями. Лично я считаю поведение второго метода более удобным на практике, потому что его ожидание автоматически устраняет уровень вложенности, и поэтому я могу просто
catch (AggregateException aex)
и справиться с
aex.InnerExceptions
внутри блока. Первый метод требует сохранения
Task
прежде, чем ждать, чтобы я мог получить доступ к
task.Exception.InnerExceptions
внутри
catch
блокировать. Дополнительные сведения о распространении исключений из асинхронных методов см . Здесь .
Обе реализации корректно обрабатывают любые ошибки, которые могут возникнуть во время перечисления
source
. В
ForEachAsync
не завершается до завершения всех ожидающих операций. Никакие задачи не остаются незамеченными (в режиме «запустил и забыл»).
¹ Первая реализация исключает async и await .
Легкий родной способ без TPL:
int totalThreads = 0; int maxThreads = 3;
foreach (var item in YouList)
{
while (totalThreads >= maxThreads) await Task.Delay(500);
Interlocked.Increment(ref totalThreads);
MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}
вы можете проверить это решение с помощью следующей задачи:
async static Task MyAsyncTask(string item)
{
await Task.Delay(2500);
Console.WriteLine(item);
}