NServiceBus - Как получить отдельную очередь для каждого типа сообщений, на который подписчик получает подписку?

У меня следующая ситуация:

Таким образом, получатель подписывается на два вида событий: eventA и eventB. NServiceBus создает очередь для получателя (Receiver) и помещает сообщения типа eventA и eventB в одну и ту же очередь. Вопрос в том, могу ли я настроить NServiceBus для использования отдельных очередей (ReceiverEventA и ReceiverEventB) для каждого типа события для получателя? Или я могу иметь два получателя в одном процессе (и каждый получатель отдельную очередь). Дело в том, что EventA обрабатывается намного дольше, чем EventB, и они независимы - поэтому, если они будут в отдельных очередях, они могут обрабатываться одновременно.

Обновление: если я иду с наивным подходом, подобным этому, приемник не в состоянии начать с исключением нулевой ссылки:

 private static IBus GetBus<THandler, TEvent>()
    {                   
        var bus = Configure.With(new List<Type>
                                     {
                                         typeof(THandler), 
                                         typeof(TEvent),
                                         typeof(CompletionMessage)
                                     })
            .Log4Net()
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
            .UnicastBus()
            .LoadMessageHandlers()
            .ImpersonateSender(false);

        bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);

        return bus.CreateBus().Start();
    }

    [STAThread]
    static void Main()
    {
        Busses = new List<IBus>
                     {
                         GetBus<ItemEventHandlerA, ItemEventA>(),
                         GetBus<ItemEventHandlerB, ItemEventB>()
                     };          

        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new TestForm());
    }

Трассировка стека исключений:

в NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler, TEvent в C:\Users\ Пользователь \Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs: строка 57
в NServiceBusTest2.WinFormsReceiver.Program.Main() в C:\Users\ Пользователь \Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs: строка 26
в System.AppDomain._nExecuteAssembly(сборка RuntimeAssembly, аргументы String[])
в System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assembly AssemblySecurity, String[] args) в Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
в System.Threading.ThreadHelper.ThreadStart_Context(состояние объекта)
в System.Threading.ExecutionContext.Run(ExecutionContext executeContext, обратный вызов ContextCallback, состояние объекта, логическое значение ignoreSyncCtx)
в System.Threading.ExecutionContext.Run(ExecutionContext executeContext, обратный вызов ContextCallback, состояние объекта)
в System.Threading.ThreadHelper.ThreadStart()

2 ответа

Решение

Я начал писать более подробное объяснение того, почему у вас НЕ должно быть двух отдельных процессов, в поддержку моего комментария к опубликованному ответу @stephenl. NServiceBus в основном обеспечивает одну входную очередь на процесс.

Поэтому обычно у вас есть два отдельных процесса. EventAService будет читать EventA из QForEventA, в отдельном процессе от EventBService, который будет читать EventB из QForEventB.

Затем я более внимательно посмотрел на ваш пример кода и понял, что вы находитесь в приложении Windows Forms. Duh! Теперь я чувствую себя немного глупо. Конечно, вы можете иметь только один процесс. Представьте, что после запуска Outlook вам также нужно было запустить MailService.exe для получения почты!

Таким образом, проблема в том, что у вас в приложении Windows Forms разное время обработки EventA и EventB. У меня нет никакого способа узнать, что это за работа, но это немного странно для клиентского приложения.

В большинстве случаев это сервис, требующий большой обработки, и любое сообщение, полученное клиентом, довольно легкое - что-то вроде "Entity X изменился, поэтому в следующий раз вам нужно будет загрузить его прямо из базы данных" и его обработка включает в себя просто удаление чего-то из кеша - конечно, это не длительный процесс.

Но, похоже, по какой-то причине обработка в ВАШЕМ клиенте занимает больше времени, что в приложении WinForms вызывает обеспокоенность по поводу маршалинга потока пользовательского интерфейса, блокировки потока пользовательского интерфейса и т. Д.

Я бы предложил систему, в которой вы не выполняете всю обработку в обработчике NServiceBus в своем приложении WinForms, а выполняете его в другом месте. Добавьте его в ThreadPool как рабочий элемент или что-то в этом роде. Или поместите долго работающие элементы в очередь и создайте фоновый поток для них со своей скоростью. Таким образом, все, что делает обработчик сообщений NServiceBus: "Да, получил сообщение. Большое спасибо". Тогда не должно иметь значения, обрабатываются ли сообщения NServiceBus по одному.

Обновить:

В комментариях OP спрашивает, что произойдет, если вы выбросите работу в ThreadPool после ее получения NServiceBus. Это, конечно, обратная сторона этого подхода - после того, как NServiceBus будет создан, вы будете самостоятельно - если он потерпит неудачу в ThreadPool, тогда вы сами должны создать свою собственную логику повторения или просто перехватить исключение, предупредить пользователя приложения WinForms и позволить ему умереть.

Очевидно, что это оптимально, но возникает вопрос - какой именно вид работы вы пытаетесь выполнить в приложении WinForms? Если надежность, которую предлагает NServiceBus (автоматические повторные попытки и очередь ошибок для вредоносных сообщений), является критически важной частью этой головоломки, то почему она вообще происходит в приложении WinForms? Эта логика, вероятно, должна быть выгружена в службу, внешнюю по отношению к приложению WinForms, где становится легко иметь отдельные очереди для каждого типа сообщений (путем развертывания отдельных служб), и тогда только части, влияющие на пользовательский интерфейс, когда-либо отправляются обратно клиенту WinForms. Когда сообщения в пользовательский интерфейс влияют только на пользовательский интерфейс, их обработка почти всегда тривиальна, и вам не потребуется разгрузка в ThreadPool, чтобы не отставать.

Если говорить непосредственно о ситуации, которую вы описали в выпуске GitHub, то это действительно похоже на тот сценарий, когда отдельные процессы для каждого типа сообщений являются именно предписанным решением. Я слышал, что это слишком сложно для развертывания и управления таким количеством процессов, но я думаю, вы обнаружите, что это не так плохо, как кажется. Есть даже преимущества - например, если вам необходимо повторно развернуть свой соединитель на Amazon.com, вам нужно только повторно развернуть конечную точку ТА, без каких-либо простоев для других или каких-либо опасений, что ошибки могут быть внесены в другие.

Чтобы упростить развертывание, мы надеемся, что вы используете сервер непрерывной интеграции, а затем подключитесь к таким инструментам, как DropkicK, которые помогают сценариям развертываний. Лично мой любимый инструмент развертывания - старый добрый Robocopy. Даже что-то простое: 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName достаточно эффективно.

Вам не нужно, вам просто нужно создать обработчик для каждого события в получателе. Например

public class EventAHandler : IHandleMessages<EventA>{}

public class EventBHandler : IHandleMessages<EventB>{}

Если вы хотите разделить очереди, вам нужно поместить каждый обработчик в отдельную конечную точку. Имеет ли это смысл?

Также я думаю, что ваша диаграмма требует немного работы. Я уверен, что это маркировка, но, как я понимаю, именно Host является фактическим издателем, а не конечными точками клиента (которых вы называете Publisher A и Publisher B).

ОБНОВЛЕНИЕ: это простой пример, но иллюстрирует, что я имею в виду - https://github.com/sliedig/sof9411638

Я расширил образец Full Duplex, который поставляется с NServiceBus, чтобы включить три дополнительных конечных точки, две из которых подписываются на события, публикуемые сервером, соответственно, и одна конечная точка, которая обрабатывает обе. НТН.

Другие вопросы по тегам