Обработка подстановочных знаков в CompositeQueues - XML-файл конфигурации ActiveMQ

Я хотел бы попросить совета относительно темы, которую я имею с ActiveMQ.

Я использую ActiveMQ 5.15. Я пытаюсь изменить файл конфигурации xml для добавления виртуального места назначения, используя CompositeQueues, которые перенаправляют в другую очередь / тему. Из документации ActiveMQ для этого компонента, схема выглядит следующим образом:

<compositeQueue name="IncomingOrders"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

Я был в состоянии пересылать сообщения из существующих очередей, которые названы как, например, request.typeA.classC. Однако у меня есть несколько очередей, которые используют один и тот же префикс request.typeA. и поэтому я намерен использовать групповые символы, чтобы не указывать составную очередь для каждой существующей очереди с этим префиксом и упростить ее обслуживание.

Мне нужно что-то вроде этого:

<compositeQueue name="request.typeA.>"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

Однако этот фрагмент кода не работает, и я подозреваю, что это потому, что он просто не поддерживается (по крайней мере, пока). Я попытался успешно использовать подстановочные знаки в свойстве PhysicalName, но не в имени.

  • У меня есть одно предварительное условие: я должен хранить разные очереди, использующие один и тот же префикс (не могу объединить их в одну).

  • Еще одно предварительное условие - я не могу динамически создавать новые очереди / темы с помощью кода (из-за разрешений сервера). Вот почему я заинтересован в изменении XML-файла конфигурации.

Поэтому мне интересно, знает ли кто-нибудь из вас, возможно ли использовать подстановочные знаки в свойстве name (я не читал никаких доказательств этого в документации), и если да, то как я мог это сделать. Если вы точно знаете, что это невозможно сделать с текущей версией ActiveMQ, я хотел бы поблагодарить вас за подтверждение этого.

Я также был бы признателен за другие альтернативы / советы, которые вы могли бы предложить для той же цели, которую я намереваюсь, и при соблюдении предварительных условий, которые я упоминал ранее. Я также читал о зеркальных очередях, однако это настройка, которая затрагивает все существующие очереди (я просто заинтересован в этом в их небольшом подмножестве) и может оказать значительное влияние на производительность.

Заранее большое спасибо за ваше время и наилучшие пожелания.

0 ответов

Наконец, я нашел обходной путь, который позволил мне создать MirroredQueues только для подмножества очередей с заданным префиксом.

Что я сделал, так это создал свой собственный DestinationInterceptor, чтобы создать только зеркальную очередь только для тех очередей, которые меня интересовали, и исключить остальные (поскольку реализация MirroredQueue по умолчанию отражает все очереди, созданные в системе).

Как я это сделал. Я скопировал реализацию класса MirroredQueue.java из библиотеки в новый класс CustomMirroredQueue и добавил в класс новый атрибут, называемый зеркальным отображением. Я изменил реализацию перехвата (конечный пункт назначения) из интерфейса DestinationInterceptor, принимая во внимание этот новый атрибут в if-заявлении (я создал вспомогательный метод для этого, называемый isPrefixMirrored):

/*
* This method is responsible for intercepting all the queues/topics that are created in the system.
* In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
* a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
* subscribe to that topic for receiving the same message).
* */
public Destination intercept(final Destination destination) {
    if (destination.getActiveMQDestination().isQueue()) {
        if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
            try {
                //we create a mirrored queue for that destination
                final Destination mirrorDestination = getMirrorDestination(destination);
                if (mirrorDestination != null) {
                    return new DestinationFilter(destination) {
                        public void send(ProducerBrokerExchange context, Message message) throws Exception {
                            message.setDestination(mirrorDestination.getActiveMQDestination());
                            mirrorDestination.send(context, message);

                            if (isCopyMessage()) {
                                message = message.copy();
                            }
                            message.setDestination(destination.getActiveMQDestination());
                            message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
                            super.send(context, message);
                        }
                    };
                }
            } catch (Exception e) {
                LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
            }
        }
    }
    return destination;
}

/*
* @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
* is an empty string "" then all the queues will be mirrored by default.
**/
private boolean isPrefixMirrored(Destination destination) {
    if (mirroring.equals("")) {
        return true;
    }
    List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
    final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
    return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
}

Я создал.jar только с этим настраиваемым классом и зависимостями (для этого использовал gradle) и добавил в папку lib в установке брокера ActimeMQ. Затем я смог использовать этот тег как bean-компонент в XML-файле конфигурации ActiveMQ:

<destinationInterceptors>
    <bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
        <property name="copyMessage" value="true"/>
        <property name="postfix" value=""/>
        <property name="prefix" value="mirror."/>
        <property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
    </bean>
</destinationInterceptors>

Класс должен содержать путь к этому классу из папки библиотеки ActiveMQ. Свойства copyMessage, postfix и prefix взяты из реализации MirroredQueue по умолчанию. И свойство зеркалирования будет списком со всеми конкретными очередями / префиксами для зеркалирования (и только ими).

Я использую версию 5.15.11 и вижу, что подстановочный знак работает для очереди Composite. Вот моя составная конфигурация очереди.

 <compositeQueue name="email.>">
   <forwardTo>
     <queue physicalName="MY_MAIN_QUEUE" />
   </forwardTo>
 </compositeQueue>
Другие вопросы по тегам