Метод потребления TPL BufferBlock не вызывается
Я хочу реализовать шаблон потребитель / производитель, используя BufferBlock, который работает непрерывно, как здесь, так и в коде.
Я пытался использовать ActionBlock как OP, но если буферный блок заполнен, а новые сообщения находятся в его очереди, то новые сообщения никогда не добавляются в очередь ConcurrentDictionary _queue.
В приведенном ниже коде метод ConsumeAsync никогда не вызывается при добавлении нового сообщения в буферный блок с помощью этого вызова: _messageBufferBlock.SendAsync(message)
Как я могу исправить приведенный ниже код, чтобы метод ConsumeAsync вызывался при каждом добавлении нового сообщения с помощью _messageBufferBlock.SendAsync(message)
?
public class PriorityMessageQueue
{
private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
private volatile BufferBlock<MyMessage> _messageBufferBlock;
private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
private int _dictionaryKey;
public PriorityMessageQueue()
{
_initializingTask = Init();
}
public async Task<bool> EnqueueAsync(MyMessage message)
{
return await _messageBufferBlock.SendAsync(message);
}
private async Task<bool> ConsumeAsync()
{
try
{
// This code does not fire when a new message is added to the buffereblock
while (await _messageBufferBlock.OutputAvailableAsync())
{
// A message object is never received from the bufferblock
var message = await _messageBufferBlock.ReceiveAsync();
}
return true;
}
catch (Exception ex)
{
return false;
}
}
private async Task<bool> Init()
{
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
{
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageBufferBlock = new BufferBlock<MyMessage>();
_messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});
return await ConsumeAsync();
}
}
РЕДАКТИРОВАТЬ Я удалил весь дополнительный код и добавил комментарии.
1 ответ
Я все еще не совсем уверен, чего вы пытаетесь достичь, но я постараюсь указать вам правильное направление. Большая часть кода в примере не является строго необходимой.
Мне нужно знать, когда приходит новое сообщение
Если это ваше единственное требование, то я предполагаю, что вам просто нужно запускать некоторый произвольный код всякий раз, когда передается новое сообщение. Самый простой способ сделать это в потоке данных - это использовать TransformBlock
и установите этот блок как начальный получатель в вашем конвейере. Каждый блок имеет свой собственный буфер, поэтому, если вам не нужен другой буфер, вы можете его пропустить.
public class PriorityMessageQueue {
private TransformBlock<MyMessage, MyMessage> _messageReciever;
public PriorityMessageQueue() {
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => {
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions);
_messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task<bool> EnqueueAsync(MyMessage message) {
return await _messageReciever.SendAsync(message);
}
private MyMessage NewMessageRecieved(MyMessage message) {
//do something when a new message arrives
//pass the message along in the pipeline
return message;
}
private void SetMessagePriority(MyMessage message) {
//Handle a message
}
}
Конечно, другой вариант, который у вас есть, это сделать все, что вам нужно, немедленно в течение EnqueAsync
перед возвратом задания из SendAsync
но TransformBlock
дает вам дополнительную гибкость.