TPL Dataflow Complete Pipeline, когда условие соответствует
Я думал, что это очень простой подход, но я пока не нашел ни одного примера. У меня есть один производитель и один потребитель, и я хочу закончить конвейер, когда было обработано как минимум x объектов. Дополнительно мне нужно знать, какие объекты были получены.
Вот как я это делаю:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
Я немного запутался насчет буфера. Попробуйте все. В чем разница между ожиданием задачи потребления и TryReceiveAll? Почему TryReceiveAll ложно в моем сценарии? Я думаю, что все еще что-то не так с моим подходом для достижения моих целей
1 ответ
Ваш Consume
метод должен быть ActionBlock
, Там нет необходимости использовать OutputAvailableAsync
или же TryRecieveAll
, Заменить BufferBlock
с ActionBlock
и сделать обработку в пределах ActionBlock
, Непонятно, зачем вам TransformBlock
либо если у вас нет более одного шага в процессе.
public class BlockTester
{
//Could be removed
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<int, int>(s => s + s);
var processor = new ActionBlock<int>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static int itemsRecieved = 0;
public static void ProcessMessage(int x)
{
Interlocked.Increment(ref itemsRecieved);
if (itemsRecieved > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
Или со сложным объектом сообщения:
public class Message { }
public class BlockTester
{
//Could be removed
private static TransformBlock<Message, Message> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<Message, Message>(s => s);
var processor = new ActionBlock<Message>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
public static void ProcessMessage(Message x)
{
itemsRecieved.Add(x);
if (itemsRecieved.Count > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
Изменить Чтобы ответить на оригинальный вопрос:
Почему
TryReceiveAll
вернуть ложь:
Потому что к тому времени TryReceiveAll
это побежал BufferBlock
"завершено". Для завершения блока он должен содержать 0 элементов в своем выходном буфере. Consume
метод вытаскивал все элементы до того, как блоку позволили завершить, и вы, наконец, позвоните TryRecieveAll
на пустом блоке.