ActiveMQ - несколько очередей, получающих одно и то же сообщение из виртуальной темы, создают запись мертвого письма только для одной очереди

Я использую виртуальные места назначения для реализации модели публикации и подписки в ActiveMQ 5.15.13.

У меня виртуальная тема VirtualTopicи к нему привязаны две очереди. У каждой очереди своя собственная политика повторной доставки. СкажемQueue 1 повторит сообщение 2 раза в случае возникновения исключения при обработке сообщения и Queue 2повторит сообщение 3 раза. Сообщение повторной попытки будет отправлено в очередь недоставленных сообщений. Я также используюIndividual Dead letter Queue strategy так что каждая очередь имеет собственную очередь недоставленных сообщений.

Я заметил, что когда сообщение отправляется на VirtualTopic, сообщение с одинаковым идентификатором доставляется в обе очереди. Я столкнулся с проблемой, когда потребители обеих очередей не могут успешно обработать сообщение. Сообщение предназначено дляQueue 1помещается в очередь недоставленных сообщений после двух повторных попыток. Но нет очереди мертвых писем дляQueue 2, хотя сообщение в очереди 2 повторяется 3 раза.

Это ожидаемое поведение?

Код:

public class ActiveMQRedelivery {

private final ActiveMQConnectionFactory factory;

public ActiveMQRedelivery(String brokerUrl) {
    factory = new ActiveMQConnectionFactory(brokerUrl);
    factory.setUserName("admin");
    factory.setPassword("password");
    factory.setAlwaysSyncSend(false);
}

public void publish(String topicAddress, String message) {
    final String topicName = "VirtualTopic." + topicAddress;
    try {
        final Connection producerConnection = factory.createConnection();
        producerConnection.start();
        final Session producerSession = producerConnection.createSession(false, AUTO_ACKNOWLEDGE);
        final MessageProducer producer = producerSession.createProducer(null);
        final TextMessage textMessage = producerSession.createTextMessage(message);
        final Topic topic = producerSession.createTopic(topicName);
        producer.send(topic, textMessage, PERSISTENT, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE);
    } catch (JMSException e) {
        throw new RuntimeException("Message could not be published", e);
    }

}

public void initializeConsumer(String queueName, String topicAddress, int numOfRetry) throws JMSException {
    factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
            getRedeliveryPolicy(numOfRetry));
    Connection connection = factory.createConnection();
    connection.start();
    final Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
    final Queue queue = consumerSession.createQueue("Consumer." + queueName +
            ".VirtualTopic." + topicAddress);
    final MessageConsumer consumer = consumerSession.createConsumer(queue);
    consumer.setMessageListener(message -> {
        try {
                System.out.println("in listener --- " + ((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
                consumerSession.recover();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
}

private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
    final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
    redeliveryPolicy.setMaximumRedeliveryDelay(-1);
    redeliveryPolicy.setRedeliveryDelay(0);
    return redeliveryPolicy;
}

}

Контрольная работа:

открытый класс ActiveMQRedeliveryTest {

private static final String brokerUrl = "tcp://0.0.0.0:61616";
private ActiveMQRedelivery activeMQRedelivery;

@Before
public void setUp() throws Exception {
    activeMQRedelivery = new ActiveMQRedelivery(brokerUrl);
}

@Test
public void testMessageRedeliveries() throws Exception {
    String topicAddress = "testTopic";
    activeMQRedelivery.initializeConsumer("queue1", topicAddress, 2);
    activeMQRedelivery.initializeConsumer("queue2", topicAddress, 3);
    activeMQRedelivery.publish(topicAddress, "TestMessage");
    Thread.sleep(3000);
}

@After
public void tearDown() throws Exception {
}

}

1 ответ

Я недавно столкнулся с этой проблемой. Чтобы исправить это, необходимо добавить 2 атрибута в IndividualDeadLetterStrategy, как показано ниже.

      <deadLetterStrategy>
        <individualDeadLetterStrategy destinationPerDurableSubscriber="true" enableAudit="false" queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>

Объяснение атрибутов:

destinationPerDurableSubscriber — чтобы включить отдельный пункт назначения для каждого постоянного подписчика.

enableAudit — в стратегии недоставленных сообщений аудит сообщений включен по умолчанию. Это предотвращает добавление повторяющихся сообщений в настроенный DLQ. Когда этот атрибут включен, одно и то же сообщение, которое не было доставлено нескольким подписчикам темы, будет размещено только в одном из DLQ подписчика, когда для атрибута destinationPerDurableSubscriber установлено значение true, т. е., скажем, два потребителя не могут подтвердить одно и то же сообщение для тема, это сообщение будет помещено в DLQ только для одного потребителя, а не для другого.

Другие вопросы по тегам