Маршрутизация к виртуальным направлениям внутри брокера ActiveMQ
У меня есть конфигурация activemq, в которой у меня есть виртуальное место назначения и обычная тема
Я хочу направить все сообщения JMS к месту назначения (VirtualTopic.Notifications) в 2 очереди (VirtualTopic.SMS, VirtualTopic.EMAIL) на основе их JMSType в заголовке сообщения.
И я хочу, чтобы обычная тема (VirtualTopic.gps) работала как обычно.
Это моя конфигурация activemq.xml. Здесь создаются Consumer.SMS.VirtualTopic и Consumer.EMAIL.VirtualTopic.
<destinations>
<queue physicalName="Consumer.SMS.VirtualTopic" />
<queue physicalName="Consumer.EMAIL.VirtualTopic" />
</destinations>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
При этом потребитель и тема (VirtualTopic.gps) создаются из кода на стороне сервера.
private static MessageProducer getTopicProducer(String topicName) throws JMSException {
MessageProducer producer = topicProducers.get(topicName);
if (producer == null) {
logger.info("Creating message producer for Topic : {}", topicName);
Destination destination = session.createTopic(topicName);
List<String> queueNames = PropertyReader
.getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
if (queueNames != null) {
for (String queueName : queueNames) {
Queue virtualQueue = session.createQueue(queueName);
MessageConsumer con = session.createConsumer(virtualQueue);
con.close();
}
}
producer = session.createProducer(destination);
topicProducers.put(topicName, producer);
}
return producer;
}
Все сообщения в VirtualTopic. Уведомления направляются в 2 разные очереди, и потребители могут получать сообщения из соответствующих очередей.
Но проблема в том, что все сообщения, отправляемые на VirtualTopic.gps, фильтруются, и потребители не могут использовать сообщения gps.
2 ответа
Большое спасибо, Хассен..
Добавление этой строки <virtualTopic name=">" selectorAware="false" />
к activemq.xml сделали свое дело.
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications"
forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'"
queue="Consumer.SMS.VirtualTopic" />
<filteredDestination selector="JMSType ='EMAIL'"
queue="Consumer.EMAIL.VirtualTopic" />
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
В следующем примере показано, как настроить элемент в конфигурации XML, чтобы при отправке сообщения в MY.QUEUE оно действительно пересылалось в физическую очередь FOO и в раздел BAR.
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="FOO" /> <topic physicalName="BAR" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
По умолчанию подписчики не могут получать сообщения напрямую из составной очереди или темы - это только логическая конструкция. Учитывая приведенную выше конфигурацию, подписчики могут использовать только сообщения из FOO и BAR; но не MY.QUEUE. Это поведение может быть изменено для реализации сценариев использования, таких как наблюдение за очередью, путем отправки одних и тех же сообщений в тему уведомлений (прослушивание по проводам) путем установки необязательного атрибута forwardOnly со значением false.
<compositeQueue name="IncomingOrders" forwardOnly="false"> <forwardTo> <topic physicalName="Notifications" /> </forwardTo> </compositeQueue>
Все сообщения, отправленные в IncomingOrders, будут скопированы и отправлены в Уведомления, прежде чем они будут помещены в физическую очередь IncomingOrders для потребления подписчиками.
посмотрите здесь http://activemq.apache.org/virtual-destinations.html
с вашей реальной конфигурацией вы можете использовать только из SMS и электронной почты очереди, если вы хотите использовать из уведомлений, вам нужно установить forwardOnly="false"
ОБНОВЛЕНИЕ: попробуйте этот код:
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;
public class SimpleSenderConsumerVirtualTopic {
public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
conn = cf.createConnection( );
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
.createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
conn.start();
ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
producer.send(msg);
msg = null;
while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
System.out.println("Received message is: " + msg.getText());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}
И добавить это:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
<virtualTopic name=">" selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>