Как установить весенний amqp, издатель подтверждает и возвращает?

Я новичок в весне AMQP. Когда конфиг весны amqp издатель подтверждает и возвращает, встретились проблемы.

Конфигурация amqp:

SimpleMessageListenerContainer container(CachingConnectionFactory connectionFactory, @Qualifier("topicListenerAdapter")MessageListenerAdapter listenerAdapter) {

    connectionFactory.setChannelCacheSize(5);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("request.queue","reply.queue");
    container.setMessageConverter(json2MessageConverter());
    container.setReceiveTimeout(3000);
    container.setMessageListener(listenerAdapter);

    return container;
}

Отправить сообщение:

rabbitTemplate.convertAndSend("spring-boots5", message);
        rabbitTemplate.setConfirmCallback(new ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // TODO Auto-generated method stub

                System.out.println("confirm correlationData is : "+correlationData+"ack is : "+
                ack);
            }

        });
        rabbitTemplate.setMandatory(true);

При запуске этого приложения получено сообщение amqp:

Body:'This is my first message'MessageProperties [headers={bar=baz}, timestamp=null, messageId=123456, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=request.queue, deliveryTag=1, messageCount=0]) >

встретил ошибку:

22 20:48:08.661[0;39m [31mERROR[0;39m [35m37792[0;39m [2m---[0;39m [2m[ 127.0.0.1:5672][0;39m [36mo.s.a.r.s.PublisherCallbackChannelImpl  [0;39m [2m:[0;39m No listener for seq:1

И нет ожидаемой строки на консоли:

"Подтвердите, что корреляционные данные: "+ Корреляционные данные +"ack is: "+ ack

И не знаю, как настроить ответ сообщений (я использую конфигурацию Java)

1 ответ

Решение

Вам нужно установить ConfirmCallback (а также ReturnCallback) перед отправкой сообщения.

Вам также необходимо предоставить некоторые данные корреляции при отправке, чтобы вы могли определить, для какого исходящего сообщения требуется подтверждение.

Смотрите этот тестовый пример...

@Test
public void testPublisherConfirmWithSendAndReceive() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<CorrelationData> confirmCD = new AtomicReference<CorrelationData>();
    templateWithConfirmsEnabled.setConfirmCallback(new ConfirmCallback() {

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            confirmCD.set(correlationData);
            latch.countDown();
        }
    });
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
    container.setQueueNames(ROUTE);
    container.setMessageListener(new MessageListenerAdapter(new Object() {

        @SuppressWarnings("unused")
        public String handleMessage(String in) {
            return in.toUpperCase();
        }
    }));
    container.start();
    CorrelationData correlationData = new CorrelationData("abc");
    String result = (String) this.templateWithConfirmsEnabled.convertSendAndReceive(ROUTE, (Object) "message", correlationData);
    container.stop();
    assertEquals("MESSAGE", result);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertEquals(correlationData, confirmCD.get());
}
Другие вопросы по тегам