Как вручную подтвердить сообщения RabbitMQ в Spring Cloud Stream?

Для потоковых служб я хочу, чтобы сообщение оставалось в очереди, когда базовая служба вызывается в @StreamListener выходит из строя. Для этого я понимаю, что единственный способ сделать это - настроить spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL,

После внесения изменений в конфигурацию я попытался добавить @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag в качестве аргументов метода для моего существующего @StreamListener реализация, как описано в https://docs.spring.io/spring-integration/reference/html/amqp.html. С этим кодом я столкнулся со следующим исключением:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)

Затем я обнаружил следующее: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/, который показывает пример того, как выполнять подтверждение сообщений с использованием Kafka, но я В настоящее время используется привязка RabbitMQ. Мы планируем в конечном итоге перейти на Kafka, но сейчас, как мне настроить и кодировать решение для ручного подтверждения сообщений для успешно обработанных сообщений и ручного отклонения сообщений, таким образом оставляя сообщение в очереди, когда встречаются исключения. Я сейчас на Spring Cloud Edgware.RELEASE и весенний облачный поток Ditmars.RELEASE,

ОБНОВИТЬ

Теперь у меня есть следующая конфигурация:

spring:
  cloud:
    stream:
      bindings:
        do-something-async-reply:
          group: xyz-service-do-something-async-reply
      rabbit:
        bindings:
          do-something-async-reply:
            consumer:
              autoBindDlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000
              requeueRejected: true

И я получаю следующую ошибку при запуске службы:

2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)

Какая конфигурация неправильная / мне не хватает?

1 ответ

Решение

Неверное имя свойства; вам не хватает .rabbit, Это

spring.cloud.stream.Кролик<channel>.consumer.acknowledge режим = РУЧНОЙ

так как это свойство кролика - смотрите документацию.

РЕДАКТИРОВАТЬ

Пример:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So481977082Application {

    public static void main(String[] args) {
        SpringApplication.run(So481977082Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println(in);
        Thread.sleep(60_000);
        channel.basicAck(tag, false);
        System.out.println("Ackd");
    }

}

Имейте в виду, что потребность в РУКОВОДСТВЕ Acks часто - запах; как правило, лучше разрешить контейнеру обрабатывать ошибки; увидеть requeueRejected по той же ссылке доко. Безусловное требование может вызвать бесконечный цикл.

EDIT2

У меня отлично работает...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So48197708Application {

    public static void main(String[] args) {
        SpringApplication.run(So48197708Application.class, args);
    }

    @Bean
    ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(new GenericMessage<>("foo"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Header(name = "x-death", required = false) List<?> death) {
        System.out.println(death);
        throw new RuntimeException("x");
    }

}

с

spring:
  cloud:
    stream:
      bindings:
        input:
          group: foo
          content-type: application/json
          destination: foo
          consumer:
            max-attempts: 1
        output:
          content-type: application/json
          destination: foo
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000

Результат:

null
...
Caused by: java.lang.RuntimeException: x
...
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
...

...
[{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]
Другие вопросы по тегам