Как написать объект после TransformBlock?

У меня есть список объектов, которые мне нужно перебирать параллельно. Вот что мне нужно сделать:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Так как я хочу parellelize это, я попытался использовать Parallel.ForEach(), но он не ждет, пока все действительно завершится, так как apiHelper.Get() делает ожидание внутри себя.

Parallel.ForEach(
                results,
                async (r) =>
                {
                    r.SomeList = await apiHelper.Get(r.Id);
                });

Поэтому я искал в Интернете и обнаружил следующее: в Parallel.ForEach ожидают вложения

Теперь я очень новичок в TPL (20 минут), и я могу упустить что-то очевидное. Как мне идти вперед?

        var getBlock = new TransformBlock<string, List<Something>>(
            async i =>
            {
                var c = await apiHelper.Get(i);
                return c;
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        foreach (var r in results)
        {
            r.SomeList = getBlock.Post(r.Id);  // ERROR: Can't convert boolean to list.
        }

        getBlock.Complete();

3 ответа

Решение

Вот пример использования класса ActionBlock в библиотеке потоков данных TPL.

Это в основном дает вам параллель, и async и его довольно легко понять

Пример потока данных

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   result.SomeList = await apiHelper.Get(result.Id);
}

Очевидно, вам понадобится проверка ошибок, и добавьте перец и соль по вкусу

Кроме того, предполагая, apiHelper потокобезопасен

Возможно, стоит подумать об использовании Microsoft Reactive Framework вместо этого.

Вот код:

var query =
    from r in results.ToObservable()
    from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
    select new { r, l };

query
    .Subscribe(x => x.r.SomeList = x.l);

Готово. Параллельно и асинхронно.

Просто NuGet "System.Reactive" и добавьте using System.Reactiive.Linq;,

Для вызова API асинхронно и параллельно вам не нужны Reactive или Dataflow. Единственное осложнение в том, что у вас есть, это то, что вы мутируете объект r установив его свойство с результатом вызова API. Тем не менее, то, что вы хотите, довольно просто:

Это:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

становится:

var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);

Если предположить, apiHelper.Get на самом деле является неблокирующим и асинхронным, то каждый элемент в results будет иметь асинхронный и параллельный вызов API.

Другие вопросы по тегам