Как написать объект после TransformBlock?
У меня есть список объектов, которые мне нужно перебирать параллельно. Вот что мне нужно сделать:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
Так как я хочу parellelize это, я попытался использовать Parallel.ForEach(), но он не ждет, пока все действительно завершится, так как apiHelper.Get() делает ожидание внутри себя.
Parallel.ForEach(
results,
async (r) =>
{
r.SomeList = await apiHelper.Get(r.Id);
});
Поэтому я искал в Интернете и обнаружил следующее: в Parallel.ForEach ожидают вложения
Теперь я очень новичок в TPL (20 минут), и я могу упустить что-то очевидное. Как мне идти вперед?
var getBlock = new TransformBlock<string, List<Something>>(
async i =>
{
var c = await apiHelper.Get(i);
return c;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var r in results)
{
r.SomeList = getBlock.Post(r.Id); // ERROR: Can't convert boolean to list.
}
getBlock.Complete();
3 ответа
Вот пример использования класса ActionBlock в библиотеке потоков данных TPL.
Это в основном дает вам параллель, и async
и его довольно легко понять
Пример потока данных
public static async Task DoWorkLoads(List<Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result );
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
result.SomeList = await apiHelper.Get(result.Id);
}
Очевидно, вам понадобится проверка ошибок, и добавьте перец и соль по вкусу
Кроме того, предполагая, apiHelper
потокобезопасен
Возможно, стоит подумать об использовании Microsoft Reactive Framework вместо этого.
Вот код:
var query =
from r in results.ToObservable()
from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
select new { r, l };
query
.Subscribe(x => x.r.SomeList = x.l);
Готово. Параллельно и асинхронно.
Просто NuGet "System.Reactive" и добавьте using System.Reactiive.Linq;
,
Для вызова API асинхронно и параллельно вам не нужны Reactive или Dataflow. Единственное осложнение в том, что у вас есть, это то, что вы мутируете объект r
установив его свойство с результатом вызова API. Тем не менее, то, что вы хотите, довольно просто:
Это:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
становится:
var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);
Если предположить, apiHelper.Get
на самом деле является неблокирующим и асинхронным, то каждый элемент в results
будет иметь асинхронный и параллельный вызов API.