Параллельно. Для SendAsync в BufferBlock в Async Transform?
Я узнаю о TPL Dataflow
Библиотека. Пока это именно то, что я искал.
Я создал простой класс (ниже), который выполняет следующие функции
- После исполнения
ImportPropertiesForBranch
Я иду в сторонний API и получить список свойств - Список xml возвращается и десериализуется в массив данных свойств (id, конечная точка API, lastupdated). Есть около 400+ объектов недвижимости (как в домах).
- Затем я использую
Parallel.For
вSendAsync
данные собственности в мойpropertyBufferBlock
propertyBufferBlock
связан сpropertyXmlBlock
(который сам по себе являетсяTransformBlock
).propertyXmlBlock
затем (асинхронно) возвращается к API (используя конечную точку API, указанную в данных свойства) и извлекает свойство xml для десериализации.- Как только XML возвращается и становится доступным, мы можем десериализовать
- Позже добавлю еще
TransformBlock
s, чтобы сохранить его в хранилище данных.
Итак, мои вопросы:
- Есть ли потенциальные узкие места или области кода, которые могут быть неприятными? Я знаю, что я не включил обработку ошибок или отмену (это будет впереди).
- Это нормально для
await
асинхронные вызовы внутриTransformBlock
или это узкое место? - Хотя код работает, меня беспокоит буферизация и асинхронность
Parallel.For
,BufferBlock
и асинхронный вTransformBlock
, Я не уверен, что это лучший способ, и я, возможно, перепутал некоторые концепции.
Любые рекомендации, улучшения и советы приветствуются.
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}
2 ответа
Есть ли потенциальные узкие места или области кода, которые могут быть неприятными?
В целом ваш подход выглядит хорошо, и потенциальная проблема заключается в том, что вы ограничиваете параллельную обработку ваших блоков MaxDegreeOfParallelism = 1
, Основываясь на описании проблемы, каждый элемент может быть обработан независимо от других, поэтому вы можете обрабатывать несколько элементов одновременно.
Это нормально ждать асинхронных звонков внутри
TransformBlock
или это узкое место?
Это прекрасно, потому что TPL DataFlow поддерживает асинхронные операции.
Хотя код работает, меня беспокоит буферизация и асинхронность
Parallel.For
,BufferBlock
и асинхронный вTransformBlock
, Я не уверен, что это лучший способ, и я, возможно, перепутал некоторые концепции.
Одна потенциальная проблема в вашем коде, которая может заставить вас выстрелить себе в ногу, - это вызвать асинхронный метод в Parallel.For
а потом звонит propertyBufferBlock.Complete();
, Проблема здесь в том, что Parallel.For
не поддерживает асинхронные действия и способ вызова propertyBufferBlock.SendAsync
и двигаться дальше до завершения возвращенного задания. Что означает, что к тому времени Parallel.For
Выход Некоторые операции могут все еще находиться в рабочем состоянии, а элементы еще не добавлены в блок буфера. А если позвонишь то позвони propertyBufferBlock.Complete();
эти ожидающие элементы выдают исключение, и элементы не будут добавлены к обработке. Вы получите ненаблюдаемое исключение.
Вы могли бы использовать ForEachAsync
сформируйте этот пост в блоге, чтобы убедиться, что все элементы добавляются в блок до его завершения. Но если вы все еще ограничиваете обработку одной операцией, вы можете просто добавлять элементы по одному. Я не уверен как propertyBufferBlock.SendAsync
реализован, но может случиться так, что он будет внутренне ограничен добавлением одного элемента за раз, поэтому параллельное добавление не будет иметь никакого смысла.
Вы на самом деле делаете что-то неправильно:
i => propertyBufferBlock.SendAsync(properties.property[i])
Вам нужно await
метод, в противном случае вы создаете слишком много одновременных задач.
Также эта строка:
MaxDegreeOfParallelism = 1
ограничит выполнение ваших блоков последующим выполнением, что может ухудшить вашу производительность.
Как вы сказали в комментариях, вы переключились на синхронный метод Post
и имеют ограниченную емкость блоков, установив BoundedCapacity
, Этот вариант следует использовать с осторожностью, так как вам необходимо проверить его возвращаемое значение, в котором указано, было ли сообщение принято или нет.
Что касается вашего беспокойства в ожидании async
методы внутри блоков - это абсолютно нормально, и должно быть сделано как в других случаях async
использование метода.