Трубопровод NServiceBus с дистрибьюторами
Я строю конвейер обработки с NServiceBus, но у меня проблемы с конфигурацией дистрибьюторов, чтобы сделать каждый шаг в процессе масштабируемым. Вот некоторая информация:
- Конвейер будет иметь основной процесс, который говорит "ОК, время для запуска" для WorkItem, который затем запустит процесс, как потоковая диаграмма.
- Каждый шаг в блок-схеме может быть вычислительно дорогим, поэтому я хочу иметь возможность масштабировать каждый шаг. Это говорит мне, что для каждого шага нужен Дистрибьютор.
- Я хочу иметь возможность подключить дополнительные мероприятия к событиям позже. Это говорит мне, что мне нужно публиковать () сообщения, когда это будет сделано, а не отправлять () их.
- Процесс может нуждаться в ветвлении в зависимости от условия. Это говорит мне о том, что процесс должен иметь возможность публиковать более одного типа сообщений.
- Процесс может потребоваться присоединиться к вилкам. Я полагаю, что я должен использовать саги для этого.
Надеюсь, эти предположения хороши, иначе у меня больше проблем, чем я думал.
Для простоты давайте забудем о разветвлении или присоединении и рассмотрим простой конвейер с этапом A, за которым следует этап B, и заканчивающимся этапом C. Каждый этап получает свой собственный распределитель и может иметь много узлов, обрабатывающих сообщения.
- Рабочие NodeA содержат процессор IHandleMessages и публикуют EventA
- Работники NodeB содержат процессор IHandleMessages и публикуют событие B
- Рабочие NodeC содержат процессор IHandleMessages, и затем конвейер завершен.
Вот соответствующие части файлов конфигурации, где # обозначает номер рабочего (т.е. есть входные очереди NodeA.1 и NodeA.2):
NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
<MessageEndpointMappings>
</MessageEndpointMappings>
</UnicastBusConfig>
NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
А вот соответствующие части конфигов дистрибьютора:
Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>
Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>
Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>
Я тестирую с использованием 2 экземпляров каждого узла, и проблема, кажется, возникает в середине на узле B. Есть в основном две вещи, которые могут произойти:
- Оба экземпляра Узла B сообщают, что он подписывается на EventA, а также что NodeC.Distrib.Data@MYCOMPUTER подписывается на Event B, который публикует Узел B. В этом случае все прекрасно работает.
- Оба экземпляра узла B сообщают, что он подписывается на EventA, однако один работник говорит, что NodeC.Distrib.Data@MYCOMPUTER подписывается ДВАЖДЫ, а другой работник не упоминает об этом.
Во втором случае, который, по-видимому, контролируется только тем, как распространитель направляет сообщения подписки, если узел "переигрывателя" обрабатывает EventA, все в порядке. Если "underachiever" обрабатывает EventA, то публикация Event B не имеет подписчиков, и рабочий процесс прекращается.
Итак, мои вопросы:
- Возможна ли такая установка?
- Конфигурация правильная? Трудно найти какие-либо примеры конфигурации с дистрибьюторами, кроме простой одноуровневой настройки издателя /2-работника.
- Будет ли более разумным иметь один центральный посреднический процесс, который выполняет все операции, не требующие большого вычислительного трафика, и отправляет сообщения процессам, находящимся за дистрибьюторами, когда задача выполняется долго и должна быть сбалансирована по нагрузке?
- Тогда узлы с балансировкой нагрузки могут просто ответить центральному брокеру, что кажется проще.
- С другой стороны, это, кажется, противоречит децентрализации, которая является сильной стороной NServiceBus.
- И если это ответ, а событие done долгосрочного процесса - ответ, как сохранить публикацию, которая обеспечивает последующую расширяемость опубликованных событий?
1 ответ
Проблема в том, что ваши узлы не видят список подписчиков друг друга. Причина, по которой вы столкнулись с этой проблемой, заключается в том, что вы испытываете производственный сценарий (масштабирование) в профиле NServiceBus по умолчанию (lite), который не поддерживает масштабирование, но делает разработку на одной машине очень продуктивной.
Чтобы решить эту проблему, запустите хост NServiceBus, используя рабочий профиль, как описано на этой странице:
http://docs.particular.net/nservicebus/hosting/nservicebus-host/profiles
Это позволит различным узлам совместно использовать один и тот же список подписчиков.
Помимо этого, ваша конфигурация прямо на.