Метод потребления TPL BufferBlock не вызывается

Я хочу реализовать шаблон потребитель / производитель, используя BufferBlock, который работает непрерывно, как здесь, так и в коде.

Я пытался использовать ActionBlock как OP, но если буферный блок заполнен, а новые сообщения находятся в его очереди, то новые сообщения никогда не добавляются в очередь ConcurrentDictionary _queue.

В приведенном ниже коде метод ConsumeAsync никогда не вызывается при добавлении нового сообщения в буферный блок с помощью этого вызова: _messageBufferBlock.SendAsync(message)

Как я могу исправить приведенный ниже код, чтобы метод ConsumeAsync вызывался при каждом добавлении нового сообщения с помощью _messageBufferBlock.SendAsync(message)?

    public class PriorityMessageQueue 
    {
        private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
        private volatile BufferBlock<MyMessage> _messageBufferBlock;
        private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
        private int _dictionaryKey;

        public PriorityMessageQueue()
        {
            _initializingTask = Init();
        }

        public async Task<bool> EnqueueAsync(MyMessage message)
        {
            return await _messageBufferBlock.SendAsync(message);
        }

        private async Task<bool> ConsumeAsync()
        {
            try
            {
                // This code does not fire when a new message is added to the buffereblock
                while (await _messageBufferBlock.OutputAvailableAsync())
                {
                    // A message object is never received from the bufferblock
                    var message = await _messageBufferBlock.ReceiveAsync();


                }

                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }

        private async Task<bool> Init()
        {
            var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 50
            };

            var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
            {
                SetMessagePriority(msg);
            }, executionDataflowBlockOptions);

            _messageBufferBlock = new BufferBlock<MyMessage>();
            _messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});

            return await ConsumeAsync();
        }
    }

РЕДАКТИРОВАТЬ Я удалил весь дополнительный код и добавил комментарии.

1 ответ

Решение

Я все еще не совсем уверен, чего вы пытаетесь достичь, но я постараюсь указать вам правильное направление. Большая часть кода в примере не является строго необходимой.

Мне нужно знать, когда приходит новое сообщение

Если это ваше единственное требование, то я предполагаю, что вам просто нужно запускать некоторый произвольный код всякий раз, когда передается новое сообщение. Самый простой способ сделать это в потоке данных - это использовать TransformBlock и установите этот блок как начальный получатель в вашем конвейере. Каждый блок имеет свой собственный буфер, поэтому, если вам не нужен другой буфер, вы можете его пропустить.

public class PriorityMessageQueue {        
    private TransformBlock<MyMessage, MyMessage> _messageReciever;

    public PriorityMessageQueue() {
        var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 50
        };

        var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => {
            SetMessagePriority(msg);
        }, executionDataflowBlockOptions);

        _messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions);
        _messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public async Task<bool> EnqueueAsync(MyMessage message) {
        return await _messageReciever.SendAsync(message);
    }

    private MyMessage NewMessageRecieved(MyMessage message) {
        //do something when a new message arrives

        //pass the message along in the pipeline
        return message;
    }

    private void SetMessagePriority(MyMessage message) {
        //Handle a message
    }
}

Конечно, другой вариант, который у вас есть, это сделать все, что вам нужно, немедленно в течение EnqueAsync перед возвратом задания из SendAsync но TransformBlock дает вам дополнительную гибкость.

Другие вопросы по тегам