Как работать с очередями недоставленных писем с реактором rabbitmq
В нашем приложении мы переходим от spring-amqp к Reaction-rabbitmq, чтобы хорошо работать с реактивным характером приложения. Мы читали официальное руководство от проекта реактор. Однако я не совсем уверен, как отправлять сообщения в очереди недоставленных сообщений после исчерпания количества повторных попыток.
В предыдущей реализации мы бросали Spring-AMPQ при условии AmqpRejectAndDontRequeueException, что заставляет сообщения автоматически попадать в очередь недоставленных сообщений, без использования выделенного издателя для очереди недоставленных сообщений. Как сделать подобное в response-rabbitmq? Нужно ли мне написать выделенного издателя, который будет вызываться слушателями после того, как количество повторных попыток исчерпано, или есть другие способы справиться с этим. Также существует официальная проектная документация реактора для DLQ и парковочных очередей.
Вот несколько примеров кода для обеих версий:
Версия AMQP:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
var mlc = messageListenerContainerFactory
.createMessageListenerContainer(SAMPLE_QUEUE);
MessageListener messageListener = message -> {
try {
TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
logger.info("Received message for id : {}", myModel.getId());
processMessage(myModel)
.subscriberContext(ctx -> {
Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
.orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
}).block();
MDC.clear();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
}
};
mlc.setupMessageListener(messageListener);
mlc.start();
}
processMessage
выполняет бизнес-логику, и в случае неудачи я хочу переместить ее в DLQ. Прекрасно работает в случае AMQP.
Версия Reactor RabbitMQ:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
receiver.consumeAutoAck(SAMPLE_QUEUE)
.subscribe(delivery -> {
TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
Mono.just(traceableMessage)
.map(this::extractMyModel)
.doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
.flatMap(this::processMessage)
.doFinally(signalType -> MDC.clear())
.retryWhen(Retry
.fixedDelay(1, Duration.ofMillis(10000))
.onRetryExhaustedThrow() //Move to DLQ
.doAfterRetry(retrySignal -> {
if ((retrySignal.totalRetries() + 1) >= 1) {
logger.info("Exhausted retries");
//Move to DLQ
}
}))
.subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
.subscribe();
}
);
}
Одно из этих двух мест с комментариями
//Move to DLQ
, было бы я догадываюсь, куда должно идти сообщение в DLQ. Вот где я решаю, что больше не могу это обрабатывать. Если другой издатель нажимает на DLQ, или какой-либо конкретный параметр может позаботиться об этом автоматически.
Пожалуйста, дайте мне знать.
1 ответ
Я получил ответ на эту проблему. Я использовал асинхронные слушатели и использовал
consumeAutoAck
. Когда я перешел на
consumeManualAck
Я получаю
AcknowledgebleDelivery
где я могу сделать
nack(false)
который должен переместить его в очередь недоставленных писем.