BroadcastBlock недостающие элементы
У меня есть список номеров проектов, которые мне нужно обработать. Проект может содержать около 8000 элементов, и мне нужно получить данные для каждого элемента в проекте, а затем поместить эти данные в список серверов. Может кто-нибудь, пожалуйста, скажите мне следующее..
1) У меня 1000 элементов в iR, но только 998 были записаны на серверы. Потерял ли я предметы с помощью broadCastBlock? 2) Я правильно делаю ожидание на всех actionBlocks? 3) Как мне сделать вызов базы данных асинхронным?
Вот код базы данных
public MemcachedDTO GetIR(MemcachedDTO dtoItem)
{
string[] Tables = new string[] { "iowa", "la" };
using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString))
{
using (SqlCommand command = new SqlCommand("test", connection))
{
DataSet Result = new DataSet();
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add("@ProjectId", SqlDbType.VarChar);
command.Parameters["@ProjectId"].Value = dtoItem.ProjectId;
connection.Open();
Result.EnforceConstraints = false;
Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables);
dtoItem.test = Result;
}
}
return dtoItem;
}
Обновление: я обновил код ниже. Он просто зависает, когда я его запускаю, и записывает только 1/4 данных на сервер? Подскажите пожалуйста, что я делаю не так?
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
var targetsList = targets.ToList();
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions
{
CancellationToken = options.CancellationToken
});
block.Completion.ContinueWith(task =>
{
foreach (var target in targetsList)
{
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
}
});
return block;
}
[HttpGet]
public async Task< HttpResponseMessage> ReloadItem(string projectQuery)
{
try
{
var linkCompletion = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
};
var cts = new CancellationTokenSource();
var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token };
IList<string> projectIds = projectQuery.Split(',').ToList();
IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();
var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });
List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>();
List<MemcachedDTO> dtoList = new List<MemcachedDTO>();
foreach (string pid in projectIds)
{
IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
dtoList.AddRange(dtoTemp);
}
foreach (string s in serverList)
{
var action = new ActionBlock<MemcachedDTO>(
async dto => await PostEachServerAsync(dto, s, "setitemcache"));
actionList.Add(action);
}
var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions);
foreach (MemcachedDTO d in dtoList)
{
await iR.SendAsync(d);
}
iR.Complete();
iR.LinkTo(bBlock);
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());
return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
}
catch (Exception ex)
{
return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
}
}
1 ответ
1) У меня 1000 элементов в iR, но только 998 были записаны на серверы. Потерял ли я предметы с помощью broadCastBlock?
Да, в приведенном ниже коде BoundedCapacity
к одному, если в любое время ваш BroadcastBlock
не может пройти мимо предмета, он его бросит. Дополнительно BroadcastBlock
будет только размножаться Completion
к одному TargetBlock
, не использовать PropagateCompletion=true
Вот. Если вы хотите, чтобы все блоки завершились, вы должны обработать Completion
вручную. Это можно сделать, установив ContinueWith
на BroadcastBlock
пройти Completion
для всех подключенных целей.
var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 });
broadcast.LinkTo(action, linkCompletion);
actionList.Add(action);
Вариант: вместо BroadcastBlock
использовать правильно ограниченный BufferBlock
, Когда ваши нижестоящие блоки привязаны к одному предмету, они не могут получить дополнительные предметы, пока не закончат обработку того, что имеют. Это позволит BufferBlock
предлагать свои вещи другому, возможно, праздному, ActionBlock
,
Когда вы добавляете элементы в удушенный поток, то есть поток с BoundedCapacity
меньше, чем Unbounded
, Вы должны использовать SendAsync
метод или, по крайней мере, обрабатывать возврат Post
, Я бы порекомендовал просто использовать SendAsync
:
foreach (MemcachedDTO d in dtoList)
{
await iR.SendAsync(d);
}
Это заставит подпись вашего метода стать:
public async Task<HttpResponseMessage> ReloadItem(string projectQuery)
2) Я правильно делаю ожидание на всех actionBlocks?
Предыдущее изменение позволит вам снять блокировку Wait
призвать в пользу await Task.WhenAlll
iR.Complete();
actionList.ForEach(x => x.Completion.Wait());
To:
iR.Complete();
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete());
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());
3) Как мне сделать вызов базы данных асинхронным?
Я собираюсь оставить это открытым, потому что это должен быть отдельный вопрос, не связанный с TPL-Dataflow
, но вкратце async
Api для доступа к вашей БД и async
будет естественно расти через вашу кодовую базу. Это должно начать вас.
BufferBlock против BroadcastBlock
После перечитывания вашего предыдущего вопроса и ответа от @VMAtm. Кажется, вы хотите, чтобы каждый элемент был отправлен на все пять серверов, в этом случае вам понадобится BroadcastBlock
, Вы бы использовали BufferBlock
распределять сообщения относительно равномерно по гибкому пулу серверов, каждый из которых может обработать сообщение. Тем не менее, вам все равно нужно будет взять на себя управление распространением завершения и сбоев для всех подключенных ActionBlocks
в ожидании завершения BroadcastBlock
,
Предотвратить выпадение сообщений BroadcastBlock
В общем у вас два варианта, установите свой ActionBlocks
быть несвязанным, что является их значением по умолчанию:
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded });
Или транслируйте сообщения самостоятельно из любого разнообразия вашей собственной конструкции. Вот пример реализации от @ i3arnon. И еще один из @svick