Реализация долговременной подписки на ServiceMix
В настоящее время я использую ServiceMix для маршрутизации сообщений с использованием библиотек QPID.
Мой рабочий конфиг ниже:
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
<bean id="amqp" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="connectionFactory">
<bean class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="failover:amqps://remote-broker:9551?transport.trustStoreLocation=/vault/QA_UM_A.jks />
</bean>
</property>
</bean>
<bean id="kubeActivemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://10.100.10.13:61616" />
</bean>
<route>
<from uri="amqp:queue:///queue-1" />
<to uri="kubeActivemq:local-queue?jmsMessageType=Text" />
</route>
</camelContext>
</blueprint>
Вышеуказанный конфиг работает.
Теперь мне нужно реализовать долговременную подписку, чтобы в случае сбоя servicemix он мог восстанавливать все сообщения, когда он возвращается (те, что отправлены, когда servicemix не работает).
На основании документации я реализовал маршрут:
<route>
<from uri="amqp:queue:///queue-1?clientId=1&durableSubscriptionName=bar1" />
<to uri="kubeActivemq:queue:///local-queue" />
</route>
Но эта реализация дает ОШИБКУ:
2018-05-28 13:28:58,421 | ERROR | mix-7.0.0/deploy | BlueprintCamelContext | 40 - org.apache.camel.camel-blueprint - 2.16.4 | Error occurred during starting Camel: CamelContext(camel-1) due A durable subscription requires a topic (pub-sub domain)
java.lang.IllegalArgumentException: A durable subscription requires a topic (pub-sub domain)
at org.springframework.jms.listener.AbstractMessageListenerContainer.validateConfiguration(AbstractMessageListenerContainer.java:435)
at org.springframework.jms.listener.AbstractJmsListeningContainer.afterPropertiesSet(AbstractJmsListeningContainer.java:157)
at org.apache.camel.component.jms.JmsConsumer.prepareAndStartListenerContainer(JmsConsumer.java:163)
at org.apache.camel.component.jms.JmsConsumer.doStart(JmsConsumer.java:155)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:3234)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:3528)
at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:3464)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3394)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3162)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3018)
.
.
.
Я что-то пропустил здесь? Разве названия тем должны совпадать с именами очередей?
1 ответ
Ваш маршрут инструктирует Camel о подключении к очереди, и вам нужно изменить его, чтобы использовать "тему" для долговременных подписок. (Очереди подписки по умолчанию долговечны)
AMQP: очереди: /// очередь 1...
А для долговременных подписок нужна тема
AMQP: тема: /// тема-1...