Пул загрузочных подключений activemq для потребителей
Потребуется настроить пул подключений Spring Boot ActiveMQ? У меня только один потребитель в весеннем загрузочном приложении (как микро сервис), производители находятся в другом приложении. Меня немного смущает следующее: (извлечено из http://activemq.apache.org/spring-support.html)
Примечание: хотя PooledConnectionFactory разрешает создание коллекции активных потребителей, она не "объединяет" потребителей. Объединение в пул имеет смысл для соединений, сеансов и производителей, которые могут быть редко используемыми ресурсами, создавать их дорого и могут бездействовать при минимальных затратах. Потребители, с другой стороны, обычно просто создаются при запуске и уходят, обрабатывая входящие сообщения по мере их поступления. Когда потребитель завершает работу, предпочтительно выключить его, а не оставлять его без дела и вернуть его в пул для последующего повторного использования: это потому, что даже если потребитель простаивает, ActiveMQ будет продолжать доставлять сообщения в буфер предварительной выборки потребителя, где они будут удерживаться, пока потребитель снова не станет активным.
На той же странице я вижу это:
Вы можете использовать activemq-pool org.apache.activemq.pool.PooledConnectionFactory для эффективного объединения соединений и сеансов для вашей коллекции потребителей, или вы можете использовать Spring JMS org.springframework.jms.connection.CachingConnectionFactory для достижения того же самого. эффект
Я попытался CachingConnectionFactory (который может принимать ActiveMQConnectionFactory), где у него есть только несколько сеттеров для хранения cacheConsumers(логическое значение), cacheProducers(логическое значение), ничего не связанного с пулом соединения. Я знаю, что 1 соединение может дать вам несколько сеансов, тогда на каждый сеанс у вас будет несколько потребителей / производителей. Но мой вопрос для Потребителя, как мы объединяем, поскольку вышеприведенное утверждение говорит оставить его по умолчанию. Я сделал это одним способом:
@Bean public JmsListenerContainerFactory myFactory (ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
factory.setConcurrency("3-10");
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
return factory;
}</em><br>
Динамическое масштабирование этой ссылки также предполагает это, но я не смог найти конкретного решения.
Если кто-то сталкивался с такой ситуацией, пожалуйста, дайте ваше предложение. Спасибо за чтение этого поста, и любая помощь очень ценится. Дополнительные детали для производства:
1 ответ
В activemq есть пакет org.apache.activemq.jms.pool, который предоставляет PooledConsumer. Ниже приведен код для этого. Пожалуйста, проверьте и посмотрите, работает ли он для вас. Я знаю, что это не весенний путь, но вы можете легко настроить свой метод опроса.
PooledConnectionFactory pooledConFactory = null;
PooledConnection pooledConnection = null;
PooledSession pooledSession = null;
PooledMessageConsumer pooledConsumer = null;
Message message = null;
try
{
// Get the connection object from PooledConnectionFactory
pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
pooledConnection.start();
// Create the PooledSession from pooledConnection object
pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
// Create the PooledMessageConsumer from session with given ack mode and destination
pooledConsumer = ( PooledMessageConsumer ) pooledSession.
createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
while ( true )
{
message = pooledConsumer.receiveNoWait();
if ( message != null)
break;
}
}
catch ( JMSException ex )
{
LOGGER.error("JMS Exception occured, closing the session", ex );
}
return message;