Spring - ActiveMQ - Durable Subscription - закрытие соединения и повторная подписка для получения автономных сообщений

Я хочу реализовать решение в Spring-JMS с ActiveMQ, где я хочу создать долговременную подписку на тему. Цель состоит в том, что если подписчик на некоторое время закрывает подписку и снова создает подписку длительного пользования с тем же идентификатором клиента и именем подписки, подписчик должен получать все сообщения, которые были доставлены во время закрытия подписки.

Я хочу реализовать следующую логику, упомянутую в URL-адресе ORACLE для надежных подписок: https://docs.oracle.com/cd/E19798-01/821-1841/bncgd/index.html

введите описание изображения здесь

Но я не могу выполнить это с помощью Spring-JMS. По URL мне нужно получить экземпляр messageConsumer и вызвать close() для этого метода, чтобы временно прекратить получать сообщения из темы. Но я не уверен, как это получить.

Следующее - моя конфигурация. Пожалуйста, дайте мне знать, как изменить конфигурацию, чтобы выполнить это.

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">


<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:userName="admin"
    p:password="admin" 
    p:brokerURL="tcp://127.0.0.1:61616"
    primary="true"
    ></bean>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adiTopic"/>
    <property name="messageListener" ref="adiListener"/>
</bean>

<bean id="configTemplate" class="org.springframework.jms.core.JmsTemplate" 
        p:connectionFactory-ref="connectionFactory"
        p:defaultDestination-ref="adiTopic" primary="true"
        p:pubSubDomain="true">
</bean>

<bean id="adiTopic" class="org.apache.activemq.command.ActiveMQTopic" p:physicalName="gcaa.adi.topic"></bean>

<bean id="adiListener" class="com.gcaa.asset.manager.impl.AdiListener"></bean>

1 ответ

Решение

Почему не звонит DefaultMessageListenerContainer.stop(); остановить контейнер и потребителей?

Вы можете ввести jmsContainer к другому компоненту и закройте его, когда захотите, и позже вызовите start().

все сообщения, отправленные брокеру, когда ваш потребитель длительного пользования находится в автономном режиме, будут сохраняться до его повторного подключения.

чтобы сделать подписку на товары длительного пользования необходимо добавить это jmsContainer боб

    <property name="subscriptionDurable" value="true" />
    <property name="cacheLevel" value="1" />

Вы можете добавить subscriptionName или будет использоваться имя класса указанного прослушивателя сообщений.

Вы можете добавить clientID к connectionFactory

    <property name="clientID" value="${jms.clientId}" />

или использовать

<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleConnectionFactory"> <constructor-arg ref="connectionFactory" /> <property name="reconnectOnException" value="true" /> <property name="clientId" value="${jms.clientId}" /> </bean>

и обновить jmsContainer

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="adiTopic" /> <property name="messageListener" ref="adiListener" /> <property name="subscriptionDurable" value="true" /> <property name="cacheLevel" value="1" /> </bean>

ОБНОВИТЬ:

если твой adiListener инвентарь org.springframework.jms.listener.SessionAwareMessageListener это должен определить метод onMessage(M message, Session session) и когда у вас есть сеанс, вы можете позвонить javax.jms.Session.unsubscribe(String subscriptionName)

subscriptionName определено выше и может быть внедрено в этот bean-компонент, или может использоваться имя класса указанного прослушивателя сообщений.

Другие вопросы по тегам