Использование BufferBlock в качестве наблюдаемого без использования элементов
Мы используем BufferBlocks для создания небольшого инструмента моделирования, в котором мы хотим найти области, выполнение которых занимает много времени. Производители и потребители блоков, по сути, будут спать в течение x времени, а затем отправят сообщение в другой блок. Мы решили использовать шаблон Observer. Однако я вижу некоторое поведение, которого не ожидал. Всякий раз, когда вызывается метод OnNext наблюдателей, BufferBlock пуст (Count == 0). Это проблематично, так как я хочу, чтобы только 1 наблюдатель мог получить значение из очереди. Есть ли способ изменить это поведение? Если нет, как мне справиться с потреблением из BufferBlocks?
В настоящее время я хочу иметь возможность сделать что-то похожее на публикацию сообщений, и все наблюдатели попытаются получить их:
public void OnNext(Object value)
{
var res =this.AsConsumer().ConsumeQueue.ReceiveAsync().Result;
Thread.Sleep(this.TimeToConsume );
ProduceQueue.Post(someOtherValue);
}
Я написал несколько тестов, чтобы показать поведение BufferBlock.
[Test]
public void
WhenObservingMocks_CallsOnNextForAllMocks()
{
var firstObserver = new Mock<IObserver<int>>();
var secondObserver = new Mock<IObserver<int>>();
var block = new BufferBlock<int>();
block.AsObservable().Subscribe(firstObserver.Object);
block.AsObservable().Subscribe(secondObserver.Object);
block.Post(2);
Thread.Sleep(TimeSpan.FromMilliseconds(50));
firstObserver.Verify(e => e.OnNext(It.IsAny<int>()), Times.Once);
secondObserver.Verify(e => e.OnNext(It.IsAny<int>()), Times.Once);
}
[Test]
public void
WhenHavingObservers_DoesConsumesTheElementFromQueue()
{
var firstObserver = new Mock<IObserver<int>>();
var secondObserver = new Mock<IObserver<int>>();
var block = new BufferBlock<int>();
block.AsObservable().Subscribe(firstObserver.Object);
block.AsObservable().Subscribe(secondObserver.Object);
block.Post(2);
Assert.Zero(block.Count);
}
[Test]
public void
WhenPostingOnce_CanOnlyReceiveOnce()
{
var block = new BufferBlock<int>();
block.Post(2);
Assert.True(block.TryReceive(out int _));
Assert.False(block.TryReceive(out int _));
}