Как работать с очередями недоставленных писем с реактором rabbitmq

В нашем приложении мы переходим от spring-amqp к Reaction-rabbitmq, чтобы хорошо работать с реактивным характером приложения. Мы читали официальное руководство от проекта реактор. Однако я не совсем уверен, как отправлять сообщения в очереди недоставленных сообщений после исчерпания количества повторных попыток.

В предыдущей реализации мы бросали Spring-AMPQ при условии AmqpRejectAndDontRequeueException, что заставляет сообщения автоматически попадать в очередь недоставленных сообщений, без использования выделенного издателя для очереди недоставленных сообщений. Как сделать подобное в response-rabbitmq? Нужно ли мне написать выделенного издателя, который будет вызываться слушателями после того, как количество повторных попыток исчерпано, или есть другие способы справиться с этим. Также существует официальная проектная документация реактора для DLQ и парковочных очередей.

Вот несколько примеров кода для обеих версий:

Версия AMQP:

       @AllArgsConstructor
public class SampleListener {
    private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
    private final MessageListenerContainerFactory messageListenerContainerFactory;
    private final Jackson2JsonMessageConverter converter;

    @PostConstruct
    public void subscribe() {
        var mlc = messageListenerContainerFactory
                .createMessageListenerContainer(SAMPLE_QUEUE);
        MessageListener messageListener = message -> {
            try {
                TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
                ObjectMapper mapper = new ObjectMapper();
                mapper.registerModule(new JavaTimeModule());
                MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
                MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
                logger.info("Received message for id : {}", myModel.getId());
                processMessage(myModel)
                        .subscriberContext(ctx -> {
                            Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
                            return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
                                    .orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
                        }).block();
                MDC.clear();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
            }
        };
        mlc.setupMessageListener(messageListener);
        mlc.start();
    }

processMessageвыполняет бизнес-логику, и в случае неудачи я хочу переместить ее в DLQ. Прекрасно работает в случае AMQP.

Версия Reactor RabbitMQ:

       @AllArgsConstructor
public class SampleListener {
    private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
    private final MessageListenerContainerFactory messageListenerContainerFactory;
    private final Jackson2JsonMessageConverter converter;

    @PostConstruct
    public void subscribe() {
        receiver.consumeAutoAck(SAMPLE_QUEUE)
                .subscribe(delivery -> {
                            TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
                            Mono.just(traceableMessage)
                                    .map(this::extractMyModel)
                                    .doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
                                    .flatMap(this::processMessage)
                                    .doFinally(signalType -> MDC.clear())
                                    .retryWhen(Retry
                                            .fixedDelay(1, Duration.ofMillis(10000))
                                            .onRetryExhaustedThrow() //Move to DLQ
                                            .doAfterRetry(retrySignal -> {
                                                if ((retrySignal.totalRetries() + 1) >= 1) {
                                                    logger.info("Exhausted retries");
                                                    //Move to DLQ
                                                }
                                            }))
                                    .subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
                                    .subscribe();
                        }
                );

    }

Одно из этих двух мест с комментариями //Move to DLQ, было бы я догадываюсь, куда должно идти сообщение в DLQ. Вот где я решаю, что больше не могу это обрабатывать. Если другой издатель нажимает на DLQ, или какой-либо конкретный параметр может позаботиться об этом автоматически.

Пожалуйста, дайте мне знать.

1 ответ

Я получил ответ на эту проблему. Я использовал асинхронные слушатели и использовал consumeAutoAck. Когда я перешел на consumeManualAck Я получаю AcknowledgebleDelivery где я могу сделать nack(false) который должен переместить его в очередь недоставленных писем.

Другие вопросы по тегам