Маршрутизация к виртуальным направлениям внутри брокера ActiveMQ

У меня есть конфигурация activemq, в которой у меня есть виртуальное место назначения и обычная тема

Я хочу направить все сообщения JMS к месту назначения (VirtualTopic.Notifications) в 2 очереди (VirtualTopic.SMS, VirtualTopic.EMAIL) на основе их JMSType в заголовке сообщения.

И я хочу, чтобы обычная тема (VirtualTopic.gps) работала как обычно.

Это моя конфигурация activemq.xml. Здесь создаются Consumer.SMS.VirtualTopic и Consumer.EMAIL.VirtualTopic.

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

При этом потребитель и тема (VirtualTopic.gps) создаются из кода на стороне сервера.

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

Все сообщения в VirtualTopic. Уведомления направляются в 2 разные очереди, и потребители могут получать сообщения из соответствующих очередей.

Но проблема в том, что все сообщения, отправляемые на VirtualTopic.gps, фильтруются, и потребители не могут использовать сообщения gps.

2 ответа

Решение

Большое спасибо, Хассен..

Добавление этой строки <virtualTopic name=">" selectorAware="false" /> к activemq.xml сделали свое дело.

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="VirtualTopic.Notifications"
                    forwardOnly="false">
                    <forwardTo>
                        <filteredDestination selector="JMSType = 'SMS'"
                            queue="Consumer.SMS.VirtualTopic" />
                        <filteredDestination selector="JMSType ='EMAIL'"
                            queue="Consumer.EMAIL.VirtualTopic" />
                    </forwardTo>
                </compositeQueue>
                <virtualTopic name=">" selectorAware="false" />
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>

В следующем примере показано, как настроить элемент в конфигурации XML, чтобы при отправке сообщения в MY.QUEUE оно действительно пересылалось в физическую очередь FOO и в раздел BAR.

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="FOO" />
          <topic physicalName="BAR" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

По умолчанию подписчики не могут получать сообщения напрямую из составной очереди или темы - это только логическая конструкция. Учитывая приведенную выше конфигурацию, подписчики могут использовать только сообщения из FOO и BAR; но не MY.QUEUE. Это поведение может быть изменено для реализации сценариев использования, таких как наблюдение за очередью, путем отправки одних и тех же сообщений в тему уведомлений (прослушивание по проводам) путем установки необязательного атрибута forwardOnly со значением false.

<compositeQueue name="IncomingOrders" forwardOnly="false">
    <forwardTo>
        <topic physicalName="Notifications" />
    </forwardTo>
</compositeQueue>

Все сообщения, отправленные в IncomingOrders, будут скопированы и отправлены в Уведомления, прежде чем они будут помещены в физическую очередь IncomingOrders для потребления подписчиками.

посмотрите здесь http://activemq.apache.org/virtual-destinations.html

с вашей реальной конфигурацией вы можете использовать только из SMS и электронной почты очереди, если вы хотите использовать из уведомлений, вам нужно установить forwardOnly="false"

ОБНОВЛЕНИЕ: попробуйте этот код:

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class SimpleSenderConsumerVirtualTopic {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
            conn = cf.createConnection( );
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.AUTO_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
            MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
            producer.send(msg);
            msg = null;
            while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
                System.out.println("Received message is: " + msg.getText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

И добавить это:

<destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
          <virtualTopic name=">"  selectorAware="false" />
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
Другие вопросы по тегам