Почему мой подписчик Akka.NET Stream не получает сообщения?

Я пытаюсь написать простой поток потоков Akka.NET. Источник является IActorRef, Раковина ISubscriber, Я использую TestKit, чтобы реализовать его в качестве модульного теста:

[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
    using (var materializer = Sys.Materializer())
    {
        var probe = CreateTestProbe();
        var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
        var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
        var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
        var graph = source.ToMaterialized(sink, Keep.Both);
        var (actor, publisher) = graph.Run(materializer);

        subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));

        var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
        actor.Tell(evnt, ActorRefs.Nobody);

        base.AwaitCondition(() =>
        {
            try
            {
                subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
                return true;
            }
            catch(MockException)
            {
                return false;
            }
        });
    }
}

Начальный Verify позвонить на OnSubscribe метод проходит нормально, но подписчик Mock никогда не получает вызов OnNext,

Что я делаю неправильно?

Работает как netcoreapp2.0, Рекомендации:

"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"

1 ответ

Решение

Ваш ISubscriber<> mock не соответствует спецификации Reactive Streams. В нем говорится, что для того, чтобы получить любое сообщение после подписки, подписчик должен сначала сообщить о спросе, используя метод ISubscription.Request(long).

В общем, если вы используете тестовый набор Akka.Streams, вам не нужно подделывать подписки. Просто скачайте Akka.Streams.TestKit, чтобы получить методы расширения для Akka.Streams. Таким образом, вы сможете создать поддельного абонента, просто позвонив this.CreateManualSubscriberProbe<HandlerErrorEvent>(); внутри вашего TestKit учебный класс. Он содержит десятки методов, которые вы можете использовать для утверждения.

Пример:

public class ExampleTest : TestKit
{
    [Fact]
    public void Select_should_map_output()
    {
        using (var materializer = Sys.Materializer())
        {
            // create test probe for subscriptions
            var probe = this.CreateManualSubscriberProbe<int>();

            // create flow materialized as publisher
            var publisher = Source.From(new[] { 1, 2, 3 })
                .Select(i => i + 1)
                .RunWith(Sink.AsPublisher<int>(fanout: false), materializer);

            // subscribe probe and receive subscription
            publisher.Subscribe(probe);
            var subscription = probe.ExpectSubscription();

            // request number of elements to receive, here drain source utill the end
            subscription.Request(4); 

            // validate assertions
            probe.ExpectNext(2);
            probe.ExpectNext(3);
            probe.ExpectNext(4);

            // since source had finite number of 3 elements, expect it to complete
            probe.ExpectComplete();
        }
    }
}
Другие вопросы по тегам