NServiceBus - Как получить отдельную очередь для каждого типа сообщений, на который подписчик получает подписку?
У меня следующая ситуация:
Таким образом, получатель подписывается на два вида событий: eventA и eventB. NServiceBus создает очередь для получателя (Receiver) и помещает сообщения типа eventA и eventB в одну и ту же очередь. Вопрос в том, могу ли я настроить NServiceBus для использования отдельных очередей (ReceiverEventA и ReceiverEventB) для каждого типа события для получателя? Или я могу иметь два получателя в одном процессе (и каждый получатель отдельную очередь). Дело в том, что EventA обрабатывается намного дольше, чем EventB, и они независимы - поэтому, если они будут в отдельных очередях, они могут обрабатываться одновременно.
Обновление: если я иду с наивным подходом, подобным этому, приемник не в состоянии начать с исключением нулевой ссылки:
private static IBus GetBus<THandler, TEvent>()
{
var bus = Configure.With(new List<Type>
{
typeof(THandler),
typeof(TEvent),
typeof(CompletionMessage)
})
.Log4Net()
.DefaultBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.UnicastBus()
.LoadMessageHandlers()
.ImpersonateSender(false);
bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);
return bus.CreateBus().Start();
}
[STAThread]
static void Main()
{
Busses = new List<IBus>
{
GetBus<ItemEventHandlerA, ItemEventA>(),
GetBus<ItemEventHandlerB, ItemEventB>()
};
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Application.Run(new TestForm());
}
Трассировка стека исключений:
в NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler, TEvent в C:\Users\ Пользователь \Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs: строка 57
в NServiceBusTest2.WinFormsReceiver.Program.Main() в C:\Users\ Пользователь \Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs: строка 26
в System.AppDomain._nExecuteAssembly(сборка RuntimeAssembly, аргументы String[])
в System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assembly AssemblySecurity, String[] args) в Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
в System.Threading.ThreadHelper.ThreadStart_Context(состояние объекта)
в System.Threading.ExecutionContext.Run(ExecutionContext executeContext, обратный вызов ContextCallback, состояние объекта, логическое значение ignoreSyncCtx)
в System.Threading.ExecutionContext.Run(ExecutionContext executeContext, обратный вызов ContextCallback, состояние объекта)
в System.Threading.ThreadHelper.ThreadStart()
2 ответа
Я начал писать более подробное объяснение того, почему у вас НЕ должно быть двух отдельных процессов, в поддержку моего комментария к опубликованному ответу @stephenl. NServiceBus в основном обеспечивает одну входную очередь на процесс.
Поэтому обычно у вас есть два отдельных процесса. EventAService будет читать EventA из QForEventA, в отдельном процессе от EventBService, который будет читать EventB из QForEventB.
Затем я более внимательно посмотрел на ваш пример кода и понял, что вы находитесь в приложении Windows Forms. Duh! Теперь я чувствую себя немного глупо. Конечно, вы можете иметь только один процесс. Представьте, что после запуска Outlook вам также нужно было запустить MailService.exe для получения почты!
Таким образом, проблема в том, что у вас в приложении Windows Forms разное время обработки EventA и EventB. У меня нет никакого способа узнать, что это за работа, но это немного странно для клиентского приложения.
В большинстве случаев это сервис, требующий большой обработки, и любое сообщение, полученное клиентом, довольно легкое - что-то вроде "Entity X изменился, поэтому в следующий раз вам нужно будет загрузить его прямо из базы данных" и его обработка включает в себя просто удаление чего-то из кеша - конечно, это не длительный процесс.
Но, похоже, по какой-то причине обработка в ВАШЕМ клиенте занимает больше времени, что в приложении WinForms вызывает обеспокоенность по поводу маршалинга потока пользовательского интерфейса, блокировки потока пользовательского интерфейса и т. Д.
Я бы предложил систему, в которой вы не выполняете всю обработку в обработчике NServiceBus в своем приложении WinForms, а выполняете его в другом месте. Добавьте его в ThreadPool как рабочий элемент или что-то в этом роде. Или поместите долго работающие элементы в очередь и создайте фоновый поток для них со своей скоростью. Таким образом, все, что делает обработчик сообщений NServiceBus: "Да, получил сообщение. Большое спасибо". Тогда не должно иметь значения, обрабатываются ли сообщения NServiceBus по одному.
Обновить:
В комментариях OP спрашивает, что произойдет, если вы выбросите работу в ThreadPool после ее получения NServiceBus. Это, конечно, обратная сторона этого подхода - после того, как NServiceBus будет создан, вы будете самостоятельно - если он потерпит неудачу в ThreadPool, тогда вы сами должны создать свою собственную логику повторения или просто перехватить исключение, предупредить пользователя приложения WinForms и позволить ему умереть.
Очевидно, что это оптимально, но возникает вопрос - какой именно вид работы вы пытаетесь выполнить в приложении WinForms? Если надежность, которую предлагает NServiceBus (автоматические повторные попытки и очередь ошибок для вредоносных сообщений), является критически важной частью этой головоломки, то почему она вообще происходит в приложении WinForms? Эта логика, вероятно, должна быть выгружена в службу, внешнюю по отношению к приложению WinForms, где становится легко иметь отдельные очереди для каждого типа сообщений (путем развертывания отдельных служб), и тогда только части, влияющие на пользовательский интерфейс, когда-либо отправляются обратно клиенту WinForms. Когда сообщения в пользовательский интерфейс влияют только на пользовательский интерфейс, их обработка почти всегда тривиальна, и вам не потребуется разгрузка в ThreadPool, чтобы не отставать.
Если говорить непосредственно о ситуации, которую вы описали в выпуске GitHub, то это действительно похоже на тот сценарий, когда отдельные процессы для каждого типа сообщений являются именно предписанным решением. Я слышал, что это слишком сложно для развертывания и управления таким количеством процессов, но я думаю, вы обнаружите, что это не так плохо, как кажется. Есть даже преимущества - например, если вам необходимо повторно развернуть свой соединитель на Amazon.com, вам нужно только повторно развернуть конечную точку ТА, без каких-либо простоев для других или каких-либо опасений, что ошибки могут быть внесены в другие.
Чтобы упростить развертывание, мы надеемся, что вы используете сервер непрерывной интеграции, а затем подключитесь к таким инструментам, как DropkicK, которые помогают сценариям развертываний. Лично мой любимый инструмент развертывания - старый добрый Robocopy. Даже что-то простое: 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName достаточно эффективно.
Вам не нужно, вам просто нужно создать обработчик для каждого события в получателе. Например
public class EventAHandler : IHandleMessages<EventA>{}
public class EventBHandler : IHandleMessages<EventB>{}
Если вы хотите разделить очереди, вам нужно поместить каждый обработчик в отдельную конечную точку. Имеет ли это смысл?
Также я думаю, что ваша диаграмма требует немного работы. Я уверен, что это маркировка, но, как я понимаю, именно Host является фактическим издателем, а не конечными точками клиента (которых вы называете Publisher A и Publisher B).
ОБНОВЛЕНИЕ: это простой пример, но иллюстрирует, что я имею в виду - https://github.com/sliedig/sof9411638
Я расширил образец Full Duplex, который поставляется с NServiceBus, чтобы включить три дополнительных конечных точки, две из которых подписываются на события, публикуемые сервером, соответственно, и одна конечная точка, которая обрабатывает обе. НТН.