Пул загрузочных подключений activemq для потребителей

Потребуется настроить пул подключений Spring Boot ActiveMQ? У меня только один потребитель в весеннем загрузочном приложении (как микро сервис), производители находятся в другом приложении. Меня немного смущает следующее: (извлечено из http://activemq.apache.org/spring-support.html)

Примечание: хотя PooledConnectionFactory разрешает создание коллекции активных потребителей, она не "объединяет" потребителей. Объединение в пул имеет смысл для соединений, сеансов и производителей, которые могут быть редко используемыми ресурсами, создавать их дорого и могут бездействовать при минимальных затратах. Потребители, с другой стороны, обычно просто создаются при запуске и уходят, обрабатывая входящие сообщения по мере их поступления. Когда потребитель завершает работу, предпочтительно выключить его, а не оставлять его без дела и вернуть его в пул для последующего повторного использования: это потому, что даже если потребитель простаивает, ActiveMQ будет продолжать доставлять сообщения в буфер предварительной выборки потребителя, где они будут удерживаться, пока потребитель снова не станет активным.

На той же странице я вижу это:
Вы можете использовать activemq-pool org.apache.activemq.pool.PooledConnectionFactory для эффективного объединения соединений и сеансов для вашей коллекции потребителей, или вы можете использовать Spring JMS org.springframework.jms.connection.CachingConnectionFactory для достижения того же самого. эффект

Я попытался CachingConnectionFactory (который может принимать ActiveMQConnectionFactory), где у него есть только несколько сеттеров для хранения cacheConsumers(логическое значение), cacheProducers(логическое значение), ничего не связанного с пулом соединения. Я знаю, что 1 соединение может дать вам несколько сеансов, тогда на каждый сеанс у вас будет несколько потребителей / производителей. Но мой вопрос для Потребителя, как мы объединяем, поскольку вышеприведенное утверждение говорит оставить его по умолчанию. Я сделал это одним способом:
@Bean public JmsListenerContainerFactory myFactory (ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    factory.setConcurrency("3-10");
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}</em><br>

Динамическое масштабирование этой ссылки также предполагает это, но я не смог найти конкретного решения.

Если кто-то сталкивался с такой ситуацией, пожалуйста, дайте ваше предложение. Спасибо за чтение этого поста, и любая помощь очень ценится. Дополнительные детали для производства:

  • Этот потребитель будет получать ~500 сообщений в секунду.
  • Использование Spring Boot версии 1.5.8. RELEASE,
  • ActiveMQ 5.5 - это мой JMS

  • 1 ответ

    В activemq есть пакет org.apache.activemq.jms.pool, который предоставляет PooledConsumer. Ниже приведен код для этого. Пожалуйста, проверьте и посмотрите, работает ли он для вас. Я знаю, что это не весенний путь, но вы можете легко настроить свой метод опроса.

    PooledConnectionFactory pooledConFactory = null;
        PooledConnection pooledConnection = null;
        PooledSession pooledSession = null;
        PooledMessageConsumer pooledConsumer = null;
        Message message = null;
        try
        {
            // Get the connection object from PooledConnectionFactory
            pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
            pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
            pooledConnection.start();
    
            // Create the PooledSession from pooledConnection object
            pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
    
            // Create the PooledMessageConsumer from session with given ack mode and destination
            pooledConsumer = ( PooledMessageConsumer ) pooledSession.
                    createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
    
            while ( true )
            {
                message = pooledConsumer.receiveNoWait();
                if ( message != null) 
                    break;
            }
    
        }
        catch ( JMSException ex )
        {
            LOGGER.error("JMS Exception occured, closing the session", ex );
        }
        return message;
    
    Другие вопросы по тегам