BroadcastBlock недостающие элементы

У меня есть список номеров проектов, которые мне нужно обработать. Проект может содержать около 8000 элементов, и мне нужно получить данные для каждого элемента в проекте, а затем поместить эти данные в список серверов. Может кто-нибудь, пожалуйста, скажите мне следующее..

1) У меня 1000 элементов в iR, но только 998 были записаны на серверы. Потерял ли я предметы с помощью broadCastBlock? 2) Я правильно делаю ожидание на всех actionBlocks? 3) Как мне сделать вызов базы данных асинхронным?

Вот код базы данных

    public  MemcachedDTO GetIR(MemcachedDTO dtoItem)
    {

        string[] Tables = new string[] { "iowa", "la" };
        using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString))
        {
            using (SqlCommand command = new SqlCommand("test", connection))
            {
                DataSet Result = new DataSet();
                command.CommandType = CommandType.StoredProcedure;

                command.Parameters.Add("@ProjectId", SqlDbType.VarChar);
                command.Parameters["@ProjectId"].Value = dtoItem.ProjectId;


                connection.Open();
                Result.EnforceConstraints = false;
                Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables);
                dtoItem.test = Result;
            }
        }
        return dtoItem;
    }

Обновление: я обновил код ниже. Он просто зависает, когда я его запускаю, и записывает только 1/4 данных на сервер? Подскажите пожалуйста, что я делаю не так?

      public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
    {
        var targetsList = targets.ToList();

        var block = new ActionBlock<T>(
            async item =>
            {
                foreach (var target in targetsList)
                {
                    await target.SendAsync(item);
                }
            }, new ExecutionDataflowBlockOptions
            {
                CancellationToken = options.CancellationToken
            });

        block.Completion.ContinueWith(task =>
        {
            foreach (var target in targetsList)
            {
                if (task.Exception != null)
                    target.Fault(task.Exception);
                else
                    target.Complete();
            }
        });

        return block;
    }

    [HttpGet]
    public async Task< HttpResponseMessage> ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new ExecutionDataflowBlockOptions
            {
                 MaxDegreeOfParallelism = 2
            };
             var cts = new CancellationTokenSource();
            var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token };


            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>();


            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }
            foreach (string s in serverList)
            {
                var action = new ActionBlock<MemcachedDTO>(
                async dto => await PostEachServerAsync(dto, s, "setitemcache"));
                actionList.Add(action);
            }
            var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions);

            foreach (MemcachedDTO d in dtoList)
            {
                await iR.SendAsync(d);
            }

            iR.Complete();
            iR.LinkTo(bBlock);
            await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }

1 ответ

1) У меня 1000 элементов в iR, но только 998 были записаны на серверы. Потерял ли я предметы с помощью broadCastBlock?

Да, в приведенном ниже коде BoundedCapacity к одному, если в любое время ваш BroadcastBlock не может пройти мимо предмета, он его бросит. Дополнительно BroadcastBlock будет только размножаться Completion к одному TargetBlock, не использовать PropagateCompletion=true Вот. Если вы хотите, чтобы все блоки завершились, вы должны обработать Completion вручную. Это можно сделать, установив ContinueWith на BroadcastBlock пройти Completion для всех подключенных целей.

var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 });
broadcast.LinkTo(action, linkCompletion);
actionList.Add(action);

Вариант: вместо BroadcastBlock использовать правильно ограниченный BufferBlock, Когда ваши нижестоящие блоки привязаны к одному предмету, они не могут получить дополнительные предметы, пока не закончат обработку того, что имеют. Это позволит BufferBlock предлагать свои вещи другому, возможно, праздному, ActionBlock,

Когда вы добавляете элементы в удушенный поток, то есть поток с BoundedCapacity меньше, чем Unbounded, Вы должны использовать SendAsync метод или, по крайней мере, обрабатывать возврат Post, Я бы порекомендовал просто использовать SendAsync:

foreach (MemcachedDTO d in dtoList)
{
    await iR.SendAsync(d);
}

Это заставит подпись вашего метода стать:

public async Task<HttpResponseMessage> ReloadItem(string projectQuery)

2) Я правильно делаю ожидание на всех actionBlocks?

Предыдущее изменение позволит вам снять блокировку Wait призвать в пользу await Task.WhenAlll

iR.Complete();
actionList.ForEach(x => x.Completion.Wait());

To:

iR.Complete();
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete());
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

3) Как мне сделать вызов базы данных асинхронным?

Я собираюсь оставить это открытым, потому что это должен быть отдельный вопрос, не связанный с TPL-Dataflow, но вкратце async Api для доступа к вашей БД и async будет естественно расти через вашу кодовую базу. Это должно начать вас.

BufferBlock против BroadcastBlock

После перечитывания вашего предыдущего вопроса и ответа от @VMAtm. Кажется, вы хотите, чтобы каждый элемент был отправлен на все пять серверов, в этом случае вам понадобится BroadcastBlock, Вы бы использовали BufferBlock распределять сообщения относительно равномерно по гибкому пулу серверов, каждый из которых может обработать сообщение. Тем не менее, вам все равно нужно будет взять на себя управление распространением завершения и сбоев для всех подключенных ActionBlocks в ожидании завершения BroadcastBlock,

Предотвратить выпадение сообщений BroadcastBlock

В общем у вас два варианта, установите свой ActionBlocks быть несвязанным, что является их значением по умолчанию:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded });

Или транслируйте сообщения самостоятельно из любого разнообразия вашей собственной конструкции. Вот пример реализации от @ i3arnon. И еще один из @svick

Другие вопросы по тегам