Spring Integration: TaskExecutor и MaxConcurrentConsumers в AmqpInboundChannelAdapter
Приложение My Spring Integration использует сообщения из RabbitMQ, преобразует их в сообщение SOAP и выполняет запрос веб-службы.
Можно получить много (10 - 50) сообщений в секунду из очереди. Или после перезапуска приложения в очереди RabbitMQ может быть много тысяч сообщений.
Каков наилучший из возможных сценариев для обработки до 10 сообщений в параллельных потоках (упорядочение сообщений желательно иметь, но не обязательно, если веб-служба отвечает с ошибкой в работе, то сообщение с ошибкой следует повторить, пока оно не выполнится).
Прослушиватель Amqp не должен потреблять больше сообщений из очереди, так как неиспользуемые потоки доступны в исполнителе задач. Я мог бы определить ThreadExecutor в канале, как это:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
Или достаточно определить исполнителя задачи в AmqpInboundChannelAdapter следующим образом и не определять исполнителя задачи каналов в определении потока:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
Или, может быть, определить исполнителя задач для канала, подобного варианту 1, но дополнительно установить maxConcurrentConsumers на канальном адаптере следующим образом:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}
1 ответ
Лучшая практика для настройки concurrency
на ListenerContainer
и пусть все последующие процессы происходят в этих потоках из контейнера. Таким образом, вы получите естественное обратное давление, когда больше сообщений не будет опрошено из очереди, потому что поток занят. А с другой стороны, не будет потери сообщений только потому, что с ExecutorChannel
после контейнера слушателя вы собираетесь освободить поток опроса, и текущее сообщение будет подтверждено как использованное, но вы можете потерпеть неудачу в нисходящем направлении.