Использование BufferBlock в качестве наблюдаемого без использования элементов

Мы используем BufferBlocks для создания небольшого инструмента моделирования, в котором мы хотим найти области, выполнение которых занимает много времени. Производители и потребители блоков, по сути, будут спать в течение x времени, а затем отправят сообщение в другой блок. Мы решили использовать шаблон Observer. Однако я вижу некоторое поведение, которого не ожидал. Всякий раз, когда вызывается метод OnNext наблюдателей, BufferBlock пуст (Count == 0). Это проблематично, так как я хочу, чтобы только 1 наблюдатель мог получить значение из очереди. Есть ли способ изменить это поведение? Если нет, как мне справиться с потреблением из BufferBlocks?

В настоящее время я хочу иметь возможность сделать что-то похожее на публикацию сообщений, и все наблюдатели попытаются получить их:

      public void OnNext(Object value)
{
    var res =this.AsConsumer().ConsumeQueue.ReceiveAsync().Result;
    Thread.Sleep(this.TimeToConsume );
    ProduceQueue.Post(someOtherValue);
}

Я написал несколько тестов, чтобы показать поведение BufferBlock.

      [Test]
public void 
WhenObservingMocks_CallsOnNextForAllMocks()
{
    var firstObserver = new Mock<IObserver<int>>();
    var secondObserver = new Mock<IObserver<int>>();
    var block = new BufferBlock<int>();

    block.AsObservable().Subscribe(firstObserver.Object);
    block.AsObservable().Subscribe(secondObserver.Object);

    block.Post(2);
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    firstObserver.Verify(e => e.OnNext(It.IsAny<int>()), Times.Once); 
    secondObserver.Verify(e => e.OnNext(It.IsAny<int>()), Times.Once);
}

[Test]
public void 
    WhenHavingObservers_DoesConsumesTheElementFromQueue()
{
    var firstObserver = new Mock<IObserver<int>>();
    var secondObserver = new Mock<IObserver<int>>();
    var block = new BufferBlock<int>();

    block.AsObservable().Subscribe(firstObserver.Object);
    block.AsObservable().Subscribe(secondObserver.Object);

    block.Post(2);
    Assert.Zero(block.Count);
}

[Test]
public void 
    WhenPostingOnce_CanOnlyReceiveOnce()
{
    var block = new BufferBlock<int>();
    block.Post(2);
    Assert.True(block.TryReceive(out int _));
    Assert.False(block.TryReceive(out int _));
}

0 ответов

Другие вопросы по тегам