Обработка перехода в состояние для нескольких событий
У меня есть MassTransitStateMachine, который организует процесс, который включает в себя создание нескольких событий.
После того, как все события завершены, я хочу, чтобы состояние перешло в фазу "очистки".
Вот соответствующая декларация состояния и функция фильтра:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark this source as done.
.Then(MarkImportCompletedForLocation),
When(DataImported, IsAllDataImported)
// Once all are done, we can transition to cleaning up...
.Then(CleanUpSources)
.TransitionTo(CleaningUp)
);
...snip...
private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
{
return ctx.Instance.Locations.Values.All(x => x);
}
Поэтому, пока состояние ImportingData, я ожидаю получить несколько событий DataImported. Каждое событие помечает свое местоположение как выполненное, чтобы метод IsAllDataImported мог определить, следует ли нам переходить в следующее состояние.
Однако, если последние два события DataImported прибывают в одно и то же время, обработчик для перехода к фазе CleaningUp срабатывает дважды, и я в итоге пытаюсь выполнить очистку дважды.
Я мог бы решить это в своем собственном коде, но я ожидал, что конечный автомат справится с этим. Я делаю что-то не так, или мне просто нужно справиться с раздором?
2 ответа
Решение, предложенное Крисом, не будет работать в моей ситуации, потому что у меня есть несколько событий одного и того же типа. Мне нужно переходить только тогда, когда все эти события прибыли. Конструкция CompositeEvent не работает для этого варианта использования.
Моим решением было поднять новое событие AllDataImported во время метода MarkImportCompletedForLocation. Этот метод теперь обрабатывает определение того, все ли субимпорты завершены потокобезопасным способом.
Итак, мое определение состояния машины таково:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark the URI in the locations list as done.
.Then(MarkImportCompletedForLocation),
When(AllDataImported)
// Once all are done, we can transition to cleaning up...
.TransitionTo(CleaningUp)
.Then(CleanUpSources)
);
Метод IsAllDataImported больше не нужен в качестве фильтра.
У государства саги есть свойство Locations:
public Dictionary<Uri, bool> Locations { get; set; }
И метод MarkImportCompletedForLocation определяется следующим образом:
private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
{
lock (ctx.Instance.Locations)
{
ctx.Instance.Locations[ctx.Data.ImportSource] = true;
if (ctx.Instance.Locations.Values.All(x => x))
{
var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
}
}
}
(Я только что написал это, чтобы понять, как будет работать общий поток; я признаю, что метод MarkImportCompletedForLocation должен быть более защищенным, проверяя наличие ключей в словаре.)
Вы можете использовать составное событие, чтобы накапливать несколько событий в последующее событие, которое срабатывает при срабатывании зависимых событий. Это определяется с помощью:
CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);
During(ImportingData,
When(DataImported)
.Then(context => { do something with data }),
When(MoreDataImported)
.Then(context => { do smoething with more data}),
When(AllDataImported)
.Then(context => { okay, have all data now}));
Затем в вашем состоянии конечного автомата:
class DataImportSagaState :
SagaStateMachineInstance
{
public int ImportStatus { get; set; }
}
Это должно решить проблему, которую вы пытаетесь решить, так что попробуйте. Обратите внимание, что порядок событий не имеет значения, они могут поступать в любом порядке, поскольку состояние, в котором были получены события, находится в свойстве ImportStatus экземпляра.
Данные об отдельных событиях не сохраняются, поэтому вам нужно будет захватить их в экземпляр состояния самостоятельно, используя .Then()
методы.