Как успешно управлять MassTransitStateMachine через InMemoryTestHarness?

Следующие до: Как написать модульные тесты MassTransitStateMachine?

Вот простой тестовый класс (с использованием MS Test) для простого конечного автомата под названием ProcedureStateMachine (примечание: для нас это не настоящая машина состояния производства... просто эксперимент, с которым я привык играть MassTransitStateMachine Некоторое время назад... это казалось удобным и самостоятельным местом для экспериментов с модульным тестированием):

[TestClass]
public class ProcedureStateMachineTests
{
    private ProcedureStateMachine _machine;
    private InMemoryTestHarness _harness;
    private StateMachineSagaTestHarness<ProcedureContext, ProcedureStateMachine> _saga;

    [TestInitialize]
    public void SetUp()
    {
        _machine = new ProcedureStateMachine();
        _harness = new InMemoryTestHarness();
        _saga = _harness.StateMachineSaga<ProcedureContext, ProcedureStateMachine>(_machine);

        _harness.Start().Wait();
    }

    [TestCleanup]
    public void TearDown()
    {
        _harness.Stop().Wait();
    }

    [TestMethod]
    public async Task That_Can_Start()
    {
        // Arrange
        // Act
        await _harness.InputQueueSendEndpoint.Send(new BeginProcessing
        {
            ProcedureId = Guid.NewGuid(),
            Steps = new List<string> {"A", "B", "C" }
        });

        // Assert
        var sagaContext = _saga.Created.First();
        sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(2);
    }
}

А вот и сам класс конечного автомата:

public class ProcedureStateMachine : MassTransitStateMachine<ProcedureContext>
{
    public State Processing { get; private set; }
    public State Cancelling { get; private set; }
    public State CompleteOk { get; private set; }
    public State CompleteError { get; private set; }
    public State CompleteCancelled { get; private set; }

    public Event<BeginProcessing> Begin { get; private set; }
    public Event<StepCompleted> StepDone { get; private set; }
    public Event<CancelProcessing> Cancel { get; private set; }
    public Event<FinalizeProcessing> Finalize { get; private set; }

    public ProcedureStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => Begin);
        Event(() => StepDone);
        Event(() => Cancel);
        Event(() => Finalize);

        BeforeEnterAny(binder => binder
            .ThenAsync(context => Console.Out.WriteLineAsync(
                $"ENTERING STATE [{context.Data.Name}]")));

        Initially(
            When(Begin)
                .Then(context =>
                {
                    context.Instance.RemainingSteps = new Queue<string>(context.Data.Steps);
                })
                .ThenAsync(context => Console.Out.WriteLineAsync(
                    $"EVENT [{nameof(Begin)}]: Procedure [{context.Data.ProcedureId}] Steps [{string.Join(",", context.Data.Steps)}]"))
                .Publish(context => new ExecuteStep
                {
                    ProcedureId = context.Instance.CorrelationId,
                    StepId = context.Instance.RemainingSteps.Dequeue()
                })
                .Publish(context => new SomeFunMessage
                {
                    CorrelationId = context.Data.CorrelationId,
                    TheMessage = $"Procedure [{context.Data.CorrelationId} has begun..."
                })
                .TransitionTo(Processing)
            );

        During(Processing,
            When(StepDone)
                .Then(context =>
                {
                    if (null == context.Instance.AccumulatedResults)
                    {
                        context.Instance.AccumulatedResults = new List<StepResult>();
                    }
                    context.Instance.AccumulatedResults.Add(
                        new StepResult
                        {
                            CorrelationId = context.Instance.CorrelationId,
                            StepId = context.Data.StepId,
                            WhatHappened = context.Data.WhatHappened
                        });
                })
                .ThenAsync(context => Console.Out.WriteLineAsync(
                    $"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] Result [{context.Data.WhatHappened}] RemainingSteps [{string.Join(",", context.Instance.RemainingSteps)}]"))
                .If(context => !context.Instance.RemainingSteps.Any(),
                    binder => binder.TransitionTo(CompleteOk))
                .If(context => context.Instance.RemainingSteps.Any(),
                    binder => binder.Publish(context => new ExecuteStep
                    {
                        ProcedureId = context.Instance.CorrelationId,
                        StepId = context.Instance.RemainingSteps.Dequeue()
                    })),
            When(Cancel)
                .Then(context =>
                {
                    context.Instance.RemainingSteps.Clear();
                })
                .ThenAsync(context => Console.Out.WriteLineAsync(
                    $"EVENT [{nameof(Cancel)}]: Procedure [{context.Data.ProcedureId}] will be cancelled with following steps remaining [{string.Join(",", context.Instance.RemainingSteps)}]"))
                .TransitionTo(Cancelling)
            );

        During(Cancelling,
            When(StepDone)
                .Then(context =>
                {
                    context.Instance.SomeStringValue = "Booo... we cancelled...";
                })
                .ThenAsync(context => Console.Out.WriteLineAsync(
                    $"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] completed while cancelling."))
                .TransitionTo(CompleteCancelled));

        During(CompleteOk, When(Finalize).Finalize());
        During(CompleteCancelled, When(Finalize).Finalize());
        During(CompleteError, When(Finalize).Finalize());

        // The "SetCompleted*" thing is what triggers purging of the state context info from the store (eg. Redis)...  without that, the 
        // old completed state keys will gradually accumulate and dominate the Redis store.
        SetCompletedWhenFinalized();
    }
}

При отладке этого теста _harness имеет BeginProcessing сообщение в его Sent коллекция, но ничего нет в _saga.Created коллекция. Кажется, мне не хватает какой-то сантехники, чтобы жгут проводов управлял конечным автоматом при отправке сообщений?

====

Удаление .Wait() звонки из SetUp() а также TearDown() и обновление теста до следующего НЕ меняет поведение:

    [TestMethod]
    public async Task That_Can_Start()
    {
        try
        {
            await _harness.Start();
            // Arrange

            // Act
            await _harness.InputQueueSendEndpoint.Send(new BeginProcessing
            {
                ProcedureId = Guid.NewGuid(),
                Steps = new List<string> {"A", "B", "C"}
            });

            // Assert
            var sagaContext = _saga.Created.First();
            sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(3);
        }
        finally
        {
            await _harness.Stop();
        }
    }

1 ответ

Оказывается, что тестовый код, как показано выше, страдал от состояния гонки между _harness.InputQueueSendEndpoint.Send операции и некоторые асинхронные (помимо того, что ожидают на Send ждет) поведение в StateMachineSagaTestHarness, В результате фаза "Утверждение" тестового кода выполнялась до того, как сага была создана и ей было разрешено обрабатывать отправленное сообщение.

Копаться в SagaTestHarness Немного кода, я нашел несколько вспомогательных методов, которые я смог использовать, чтобы дождаться выполнения определенных условий в саге. Методы:

/// <summary>
/// Waits until a saga exists with the specified correlationId
/// </summary>
/// <param name="sagaId"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Guid?> Exists(Guid sagaId, TimeSpan? timeout = null)

/// <summary>
/// Waits until at least one saga exists matching the specified filter
/// </summary>
/// <param name="filter"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<IList<Guid>> Match(Expression<Func<TSaga, bool>> filter, TimeSpan? timeout = null)

/// <summary>
/// Waits until the saga matching the specified correlationId does NOT exist
/// </summary>
/// <param name="sagaId"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Guid?> NotExists(Guid sagaId, TimeSpan? timeout = null)

Поэтому я остановился на использовании таких вещей, как await _saga.Match(s => null != s.RemainingSteps); и так, чтобы эффективно дублировать мои более поздние утверждения и ждать, пока тайм-аут (по умолчанию 30 секунд) или условие, заявленное позже, станет истинным (и, следовательно, безопасным для утверждения)... в зависимости от того, что наступит раньше.

Это пока отвяжет меня, пока я не подумаю о лучшем способе узнать, когда жгут "захвачен" и готов к допросу.

Другие вопросы по тегам