Многоадресный блок TPL Dataflow
Мне нужно многоадресно передать объект в несколько путей
producer
|
multicast
| |
Process1 Process2
| |
Writedb WriteFile
широковещательный блок мало помогает, он выполняет только последние действия для обоих процессов1, процесс 2, если процесс 2 выполняется с опозданием, он не сможет получать сообщения.
DB Writer и запись файла имеют разные данные.
Вот следующий фрагмент кода.
class Program
{
public static void Main()
{
var broadCastBlock = new BroadcastBlock<int>(i => i);
var transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 transformblock called: {0}", i);
//Thread.Sleep(4);
return string.Format("1_ {0},", i);
});
var transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 transformblock called: {0}", i);
Thread.Sleep(100);
return string.Format("2_ {0},", i);
});
var processorBlockT1 = new ActionBlock<string>(i => Console.WriteLine("processBlockT1 {0}", i));
var processorBlockT2 = new ActionBlock<string>(i => Console.WriteLine("processBlockT2 {0}", i));
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlockT1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlockT2, new DataflowLinkOptions { PropagateCompletion = true });
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//completion handling
broadCastBlock.Completion.ContinueWith(x =>
{
Console.WriteLine("Braodcast block Completed");
transformBlock1.Complete();
transformBlock2.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ =>
{
processorBlockT1.Complete();
processorBlockT2.Complete();
});
});
transformBlock1.Completion.ContinueWith(x => Console.WriteLine("Transform1 completed"));
transformBlock2.Completion.ContinueWith(x => Console.WriteLine("Transform2 completed"));
processorBlockT1.Completion.ContinueWith(x => Console.WriteLine("processblockT1 completed"));
processorBlockT2.Completion.ContinueWith(x => Console.WriteLine("processblockT2 completed"));
//mark completion
broadCastBlock.Complete();
Task.WhenAll(processorBlockT1.Completion, processorBlockT2.Completion).ContinueWith(_ => Console.WriteLine("completed both tasks")).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Какой лучший способ гарантированной доставки по трансляции. т. е. многоадресная.
я должен просто вставить два буфера на обоих концах и затем использовать их, чтобы буферы всегда собирали то, что когда-либо поступало, и тогда процесс мог бы занять некоторое время, чтобы обработать их все?
0 ответов
В BroadcastBlock
гарантирует, что все сообщения будут предлагаться всем связанным блокам. Так что это именно то, что вам нужно. Что вы должны исправить, так это то, как вы кормитеBroadcastBlock
с сообщениями:
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i); // Don't do this!
}
В SendAsync
метод ожидается. У вас никогда не должно быть более одного ожидающегоSendAsync
операции, нацеленные на один и тот же блок. Это не только нарушает все гарантии порядка принимаемых сообщений, но и крайне неэффективно использует память. Весь смысл использования ограниченных блоков заключается в управлении использованием памяти путем ограничения размера внутренних буферов блоков. Выпуская несколько не ожидаемыхSendAsync
команды, которые вы обойдете это добровольное ограничение, создав внешний динамический буфер неполных Task
s, где каждая задача весит сотни байтов, для распространения сообщений, имеющих лишь часть этого веса. Эти сообщения можно было бы гораздо более эффективно буферизовать изнутри, если бы блоки вообще не были ограниченными.
for (int i = 1; i <= numElements; i++)
{
await broadCastBlock.SendAsync(i); // Now it's OK
}