Многоадресный блок TPL Dataflow

Мне нужно многоадресно передать объект в несколько путей

      producer
         |
      multicast
     |        |
 Process1   Process2
     |        |
   Writedb   WriteFile

широковещательный блок мало помогает, он выполняет только последние действия для обоих процессов1, процесс 2, если процесс 2 выполняется с опозданием, он не сможет получать сообщения.

DB Writer и запись файла имеют разные данные.

Вот следующий фрагмент кода.

class Program
{
    public static void Main()
    {
        var broadCastBlock = new BroadcastBlock<int>(i => i);

        var transformBlock1 = new TransformBlock<int, string>(i =>
        {
            Console.WriteLine("1 transformblock called: {0}", i);
            //Thread.Sleep(4);
            return string.Format("1_ {0},", i);
        });

        var transformBlock2 = new TransformBlock<int, string>(i =>
        {
            Console.WriteLine("2 transformblock called: {0}", i);
            Thread.Sleep(100);
            return string.Format("2_ {0},", i);
        });

        var processorBlockT1 = new ActionBlock<string>(i => Console.WriteLine("processBlockT1 {0}", i));
        var processorBlockT2 = new ActionBlock<string>(i => Console.WriteLine("processBlockT2 {0}", i));

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlockT1, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlockT2, new DataflowLinkOptions { PropagateCompletion = true });

        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //completion handling

        broadCastBlock.Completion.ContinueWith(x =>
        {
            Console.WriteLine("Braodcast block Completed");
            transformBlock1.Complete();
            transformBlock2.Complete();
            Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ =>
            {
                processorBlockT1.Complete();
                processorBlockT2.Complete();
            });
        });


        transformBlock1.Completion.ContinueWith(x => Console.WriteLine("Transform1 completed"));
        transformBlock2.Completion.ContinueWith(x => Console.WriteLine("Transform2 completed"));
        processorBlockT1.Completion.ContinueWith(x => Console.WriteLine("processblockT1 completed"));
        processorBlockT2.Completion.ContinueWith(x => Console.WriteLine("processblockT2 completed"));


        //mark completion
        broadCastBlock.Complete();
        Task.WhenAll(processorBlockT1.Completion, processorBlockT2.Completion).ContinueWith(_ => Console.WriteLine("completed both tasks")).Wait();


        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

Какой лучший способ гарантированной доставки по трансляции. т. е. многоадресная.

я должен просто вставить два буфера на обоих концах и затем использовать их, чтобы буферы всегда собирали то, что когда-либо поступало, и тогда процесс мог бы занять некоторое время, чтобы обработать их все?

0 ответов

В BroadcastBlock гарантирует, что все сообщения будут предлагаться всем связанным блокам. Так что это именно то, что вам нужно. Что вы должны исправить, так это то, как вы кормитеBroadcastBlock с сообщениями:

for (int i = 1; i <= numElements; i++)
{
    broadCastBlock.SendAsync(i); // Don't do this!
}

В SendAsync метод ожидается. У вас никогда не должно быть более одного ожидающегоSendAsyncоперации, нацеленные на один и тот же блок. Это не только нарушает все гарантии порядка принимаемых сообщений, но и крайне неэффективно использует память. Весь смысл использования ограниченных блоков заключается в управлении использованием памяти путем ограничения размера внутренних буферов блоков. Выпуская несколько не ожидаемыхSendAsync команды, которые вы обойдете это добровольное ограничение, создав внешний динамический буфер неполных Tasks, где каждая задача весит сотни байтов, для распространения сообщений, имеющих лишь часть этого веса. Эти сообщения можно было бы гораздо более эффективно буферизовать изнутри, если бы блоки вообще не были ограниченными.

for (int i = 1; i <= numElements; i++)
{
    await broadCastBlock.SendAsync(i); // Now it's OK
}
Другие вопросы по тегам