Использование саги NServicebus для сериализации выполнения длительных обработчиков конечных точек

Мы пытаемся сериализовать обработку списка бизнес-объектов с помощью Saga.

Прямо сейчас, без саги, мы просто перебираем список объектов и запускаем bus.Send(new ProcessBusinessObejct(obj)) асинхронное выполнение обработчиков. Таким образом, обработка происходит более или менее параллельно, в зависимости от этого параметра, я считаю:

endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );

Это сработало нормально, но количество параллельных обработчиков теперь сильно зависит от базы данных.

Было бы нормально запускать эти обработчики последовательно, т. Е. Переходить к следующему только тогда, когда текущий процесс завершился (не удалось или завершился успешно). Мы не хотим устанавливать параллелизм равным 1, это повлияет на все обработчики в конечной точке.

Идея состоит в том, чтобы использовать шаблон Scatter/Gather и Saga для отслеживания количества объектов и обновления конечного автомата с помощью счетчика (общее количество, счетчик неудач, счетчик успеха) и, наконец, запускать событие, когда список сделано / пусто.

Проблема в

А) Я не уверен, как отслеживать список в саге. SagaData нужен список для хранения всех объектов? Затем удалите экземпляр, когда обработчик сообщает о завершении обработки. Сага не поддерживает иерархические данные и, следовательно, не список или список. Я считаю, что это все еще имеет место в NSB v7.

И Б) Это использование саги осуществимо или излишне, или есть намного более простой способ сделать это?

Мы используем Sql Server персистентность и транспорт и NSB 7.

Любой вклад очень ценится!

1 ответ

Я думаю, что вы собираетесь это сделать. Имейте в виду, что в зависимости от используемого вами уровня персистентности вам может потребоваться отделить фактический импорт от обновления состояния саги. Я писал об этом здесь.

Данные Saga также могут хранить список, но я думаю, что в большинстве сценариев вы можете избежать подсчетов. Еще одно важное замечание (хотя оно и должно быть очевидным) заключается в том, что если сообщение не обрабатывается и не попадает в очередь ошибок (например, неперехваченное исключение в ImportData), вся сага останется незавершенной, пока это сообщение не будет повторено и обработано.

public class MySaga : Saga<MySagaData>
   : IAmStartedByMessages<StartTheProcess>,
     IHandleMessages<ImportData>,
     IHandleMessages<ImportFinished>
{
    public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
    {
        Data.ObjectsToImport = message.ObjectCount;
        Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance

        foreach(var id in message.ObjectIdsToImport)
        {
            await context.SendLocal(new ImportData
            {
                JobID = Data.JobID //You need this to correlate messages back to the saga
                //Anything else you need to pass on to ImportData
                ObjectIdToImport = id
            }
        });
    }

    public async Task Handle(ImportData message, IMessageHandlerContext context)
    {
        //import the data and increment the counter
        var result = ImportData(message.ObjectIdToImport);
        if(result == Result.Success)
        {
            Data.SuccessImport++;
        }
        else
        {
            Data.FailedImport++;
        }

        await CheckIfFinished(context);
    }

    public async Task Handle(ImportFinished message, IMessageHandlerContext context)
    {
        //do any post cleanups or Mark as complete 
        MarkAsComplete();
        return Task.CompletedTask;
    }

    private async Task CheckIfFinished(IMessageHandlerContext context)
    {
        if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
        {
            //Everything is done
            context.SendLocal(new ImportFinished { JobID = Data.JobID });
        }
    }
}
Другие вопросы по тегам