Использование саги NServicebus для сериализации выполнения длительных обработчиков конечных точек
Мы пытаемся сериализовать обработку списка бизнес-объектов с помощью Saga.
Прямо сейчас, без саги, мы просто перебираем список объектов и запускаем bus.Send(new ProcessBusinessObejct(obj))
асинхронное выполнение обработчиков. Таким образом, обработка происходит более или менее параллельно, в зависимости от этого параметра, я считаю:
endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );
Это сработало нормально, но количество параллельных обработчиков теперь сильно зависит от базы данных.
Было бы нормально запускать эти обработчики последовательно, т. Е. Переходить к следующему только тогда, когда текущий процесс завершился (не удалось или завершился успешно). Мы не хотим устанавливать параллелизм равным 1, это повлияет на все обработчики в конечной точке.
Идея состоит в том, чтобы использовать шаблон Scatter/Gather и Saga для отслеживания количества объектов и обновления конечного автомата с помощью счетчика (общее количество, счетчик неудач, счетчик успеха) и, наконец, запускать событие, когда список сделано / пусто.
Проблема в
А) Я не уверен, как отслеживать список в саге. SagaData нужен список для хранения всех объектов? Затем удалите экземпляр, когда обработчик сообщает о завершении обработки. Сага не поддерживает иерархические данные и, следовательно, не список или список. Я считаю, что это все еще имеет место в NSB v7.
И Б) Это использование саги осуществимо или излишне, или есть намного более простой способ сделать это?
Мы используем Sql Server персистентность и транспорт и NSB 7.
Любой вклад очень ценится!
1 ответ
Я думаю, что вы собираетесь это сделать. Имейте в виду, что в зависимости от используемого вами уровня персистентности вам может потребоваться отделить фактический импорт от обновления состояния саги. Я писал об этом здесь.
Данные Saga также могут хранить список, но я думаю, что в большинстве сценариев вы можете избежать подсчетов. Еще одно важное замечание (хотя оно и должно быть очевидным) заключается в том, что если сообщение не обрабатывается и не попадает в очередь ошибок (например, неперехваченное исключение в ImportData), вся сага останется незавершенной, пока это сообщение не будет повторено и обработано.
public class MySaga : Saga<MySagaData>
: IAmStartedByMessages<StartTheProcess>,
IHandleMessages<ImportData>,
IHandleMessages<ImportFinished>
{
public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
{
Data.ObjectsToImport = message.ObjectCount;
Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance
foreach(var id in message.ObjectIdsToImport)
{
await context.SendLocal(new ImportData
{
JobID = Data.JobID //You need this to correlate messages back to the saga
//Anything else you need to pass on to ImportData
ObjectIdToImport = id
}
});
}
public async Task Handle(ImportData message, IMessageHandlerContext context)
{
//import the data and increment the counter
var result = ImportData(message.ObjectIdToImport);
if(result == Result.Success)
{
Data.SuccessImport++;
}
else
{
Data.FailedImport++;
}
await CheckIfFinished(context);
}
public async Task Handle(ImportFinished message, IMessageHandlerContext context)
{
//do any post cleanups or Mark as complete
MarkAsComplete();
return Task.CompletedTask;
}
private async Task CheckIfFinished(IMessageHandlerContext context)
{
if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
{
//Everything is done
context.SendLocal(new ImportFinished { JobID = Data.JobID });
}
}
}