Как успешно управлять MassTransitStateMachine через InMemoryTestHarness?
Следующие до: Как написать модульные тесты MassTransitStateMachine?
Вот простой тестовый класс (с использованием MS Test) для простого конечного автомата под названием ProcedureStateMachine
(примечание: для нас это не настоящая машина состояния производства... просто эксперимент, с которым я привык играть MassTransitStateMachine
Некоторое время назад... это казалось удобным и самостоятельным местом для экспериментов с модульным тестированием):
[TestClass]
public class ProcedureStateMachineTests
{
private ProcedureStateMachine _machine;
private InMemoryTestHarness _harness;
private StateMachineSagaTestHarness<ProcedureContext, ProcedureStateMachine> _saga;
[TestInitialize]
public void SetUp()
{
_machine = new ProcedureStateMachine();
_harness = new InMemoryTestHarness();
_saga = _harness.StateMachineSaga<ProcedureContext, ProcedureStateMachine>(_machine);
_harness.Start().Wait();
}
[TestCleanup]
public void TearDown()
{
_harness.Stop().Wait();
}
[TestMethod]
public async Task That_Can_Start()
{
// Arrange
// Act
await _harness.InputQueueSendEndpoint.Send(new BeginProcessing
{
ProcedureId = Guid.NewGuid(),
Steps = new List<string> {"A", "B", "C" }
});
// Assert
var sagaContext = _saga.Created.First();
sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(2);
}
}
А вот и сам класс конечного автомата:
public class ProcedureStateMachine : MassTransitStateMachine<ProcedureContext>
{
public State Processing { get; private set; }
public State Cancelling { get; private set; }
public State CompleteOk { get; private set; }
public State CompleteError { get; private set; }
public State CompleteCancelled { get; private set; }
public Event<BeginProcessing> Begin { get; private set; }
public Event<StepCompleted> StepDone { get; private set; }
public Event<CancelProcessing> Cancel { get; private set; }
public Event<FinalizeProcessing> Finalize { get; private set; }
public ProcedureStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => Begin);
Event(() => StepDone);
Event(() => Cancel);
Event(() => Finalize);
BeforeEnterAny(binder => binder
.ThenAsync(context => Console.Out.WriteLineAsync(
$"ENTERING STATE [{context.Data.Name}]")));
Initially(
When(Begin)
.Then(context =>
{
context.Instance.RemainingSteps = new Queue<string>(context.Data.Steps);
})
.ThenAsync(context => Console.Out.WriteLineAsync(
$"EVENT [{nameof(Begin)}]: Procedure [{context.Data.ProcedureId}] Steps [{string.Join(",", context.Data.Steps)}]"))
.Publish(context => new ExecuteStep
{
ProcedureId = context.Instance.CorrelationId,
StepId = context.Instance.RemainingSteps.Dequeue()
})
.Publish(context => new SomeFunMessage
{
CorrelationId = context.Data.CorrelationId,
TheMessage = $"Procedure [{context.Data.CorrelationId} has begun..."
})
.TransitionTo(Processing)
);
During(Processing,
When(StepDone)
.Then(context =>
{
if (null == context.Instance.AccumulatedResults)
{
context.Instance.AccumulatedResults = new List<StepResult>();
}
context.Instance.AccumulatedResults.Add(
new StepResult
{
CorrelationId = context.Instance.CorrelationId,
StepId = context.Data.StepId,
WhatHappened = context.Data.WhatHappened
});
})
.ThenAsync(context => Console.Out.WriteLineAsync(
$"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] Result [{context.Data.WhatHappened}] RemainingSteps [{string.Join(",", context.Instance.RemainingSteps)}]"))
.If(context => !context.Instance.RemainingSteps.Any(),
binder => binder.TransitionTo(CompleteOk))
.If(context => context.Instance.RemainingSteps.Any(),
binder => binder.Publish(context => new ExecuteStep
{
ProcedureId = context.Instance.CorrelationId,
StepId = context.Instance.RemainingSteps.Dequeue()
})),
When(Cancel)
.Then(context =>
{
context.Instance.RemainingSteps.Clear();
})
.ThenAsync(context => Console.Out.WriteLineAsync(
$"EVENT [{nameof(Cancel)}]: Procedure [{context.Data.ProcedureId}] will be cancelled with following steps remaining [{string.Join(",", context.Instance.RemainingSteps)}]"))
.TransitionTo(Cancelling)
);
During(Cancelling,
When(StepDone)
.Then(context =>
{
context.Instance.SomeStringValue = "Booo... we cancelled...";
})
.ThenAsync(context => Console.Out.WriteLineAsync(
$"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] completed while cancelling."))
.TransitionTo(CompleteCancelled));
During(CompleteOk, When(Finalize).Finalize());
During(CompleteCancelled, When(Finalize).Finalize());
During(CompleteError, When(Finalize).Finalize());
// The "SetCompleted*" thing is what triggers purging of the state context info from the store (eg. Redis)... without that, the
// old completed state keys will gradually accumulate and dominate the Redis store.
SetCompletedWhenFinalized();
}
}
При отладке этого теста _harness
имеет BeginProcessing
сообщение в его Sent
коллекция, но ничего нет в _saga.Created
коллекция. Кажется, мне не хватает какой-то сантехники, чтобы жгут проводов управлял конечным автоматом при отправке сообщений?
====
Удаление .Wait()
звонки из SetUp()
а также TearDown()
и обновление теста до следующего НЕ меняет поведение:
[TestMethod]
public async Task That_Can_Start()
{
try
{
await _harness.Start();
// Arrange
// Act
await _harness.InputQueueSendEndpoint.Send(new BeginProcessing
{
ProcedureId = Guid.NewGuid(),
Steps = new List<string> {"A", "B", "C"}
});
// Assert
var sagaContext = _saga.Created.First();
sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(3);
}
finally
{
await _harness.Stop();
}
}
1 ответ
Оказывается, что тестовый код, как показано выше, страдал от состояния гонки между _harness.InputQueueSendEndpoint.Send
операции и некоторые асинхронные (помимо того, что ожидают на Send
ждет) поведение в StateMachineSagaTestHarness
, В результате фаза "Утверждение" тестового кода выполнялась до того, как сага была создана и ей было разрешено обрабатывать отправленное сообщение.
Копаться в SagaTestHarness
Немного кода, я нашел несколько вспомогательных методов, которые я смог использовать, чтобы дождаться выполнения определенных условий в саге. Методы:
/// <summary>
/// Waits until a saga exists with the specified correlationId
/// </summary>
/// <param name="sagaId"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Guid?> Exists(Guid sagaId, TimeSpan? timeout = null)
/// <summary>
/// Waits until at least one saga exists matching the specified filter
/// </summary>
/// <param name="filter"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<IList<Guid>> Match(Expression<Func<TSaga, bool>> filter, TimeSpan? timeout = null)
/// <summary>
/// Waits until the saga matching the specified correlationId does NOT exist
/// </summary>
/// <param name="sagaId"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<Guid?> NotExists(Guid sagaId, TimeSpan? timeout = null)
Поэтому я остановился на использовании таких вещей, как await _saga.Match(s => null != s.RemainingSteps);
и так, чтобы эффективно дублировать мои более поздние утверждения и ждать, пока тайм-аут (по умолчанию 30 секунд) или условие, заявленное позже, станет истинным (и, следовательно, безопасным для утверждения)... в зависимости от того, что наступит раньше.
Это пока отвяжет меня, пока я не подумаю о лучшем способе узнать, когда жгут "захвачен" и готов к допросу.