BackOffPolicy и SimpleRetryPolicy не действуют при вводе в RetryTemplate

Я использую Spring AMQP для отправки сообщений и возможности повторных попыток для "пользовательского" исключения. Допустим, у меня есть Receiver, который выдает пользовательское исключение "EventException", и для этого я хочу, чтобы было n попыток (в нашем примере 5). Между повторными попытками я также хочу, чтобы задержка составляла 5 секунд. Вот мой исходный код:

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    final static String queueName = "testing-queue";

    @Autowired
    AnnotationConfigApplicationContext context;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    Queue queue() {
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "dead-letter-exchange");
        Queue queue = new Queue(queueName, true, false, false, arguments);
        return queue;
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("testing-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    Queue deadLetterQueue() {
        return new Queue("dead-letter-queue", true);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange("dead-letter-exchange");
    }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = 
                new CachingConnectionFactory("localhost");

        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        return connectionFactory;
    }

    @Bean
    SimpleMessageListenerContainer container(
            ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter,
            RetryOperationsInterceptor interceptor) {

        Advice[] adviceChain = { interceptor };

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setAdviceChain(adviceChain);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        MessageListenerAdapter adapter = 
                new MessageListenerAdapter(receiver, "receiveMessage");

        return adapter;
    }

    @Bean
    RetryOperations retryTemplate() {
         Map<Class<? extends Throwable>, Boolean> retryableExceptions = 
                 new HashMap<Class<? extends Throwable>, Boolean>();
        retryableExceptions.put(EventException.class, false);

        FixedBackOffPolicy backoffPolicy = new FixedBackOffPolicy();
        backoffPolicy.setBackOffPeriod(5000);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setBackOffPolicy(backoffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5, retryableExceptions));

        return retryTemplate;
    }

    @Bean
    RetryOperationsInterceptor interceptor(RetryOperations retryTemplate) {
        RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
        interceptor.setRecoverer(new CustomMessageRecover());
        interceptor.setRetryOperations(retryTemplate);

        return interceptor;
//      return RetryInterceptorBuilder
//              .stateless()
//              //.retryOperations(retryTemplate)
//              .maxAttempts(5)
//              .recoverer(new CustomMessageRecover()).build();
    }

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
        context.close();
    }

    public class Receiver {

        public void receiveMessage(String message) throws Exception {
            System.out.println("!!!!!!!!Message has been recieved!!!!!!");
            throw new EventException("TESTING");
        }
    }

    public class CustomMessageRecover implements MethodInvocationRecoverer<Void> {

        @Override
        public Void recover(Object[] args, Throwable cause) {
            System.out.println("IN THE RECOVER ZONE!!!");
            throw new AmqpRejectAndDontRequeueException(cause);
        }
    }

    class EventException extends Exception {
        private static final long serialVersionUID = 1L;

        public EventException() {}

        public EventException(String message) {
            super(message);
        }
    }
}

Теперь в коде, как вы можете видеть, я использую RetryOperations Interceptor для перехвата и проверки, чтобы увидеть, какой тип исключения он генерирует и основывается на этом, принять решение сделать повторную попытку или нет, вместе с задержкой между повторы

Для этого я устанавливаю backoffPolicy и retryPolicy bean- компонента RetryTemplate и добавляю его в RetryOperationsInterceptor.

Я был бы признателен, если бы кто-нибудь мог мне помочь и сказать, почему повторные попытки и задержка между повторными попытками не работают Мои сообщения направляются непосредственно на обмен мертвыми письмами без повторов и задержек.

БЛАГОДАРЮ ВАС!

1 ответ

Решение

Ваша проблема здесь:

retryableExceptions.put(EventException.class, false);

Пожалуйста, найдите SimpleRetryPolicy код:

public boolean canRetry(RetryContext context) {
    Throwable t = context.getLastThrowable();
    return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;
}

и далее:

private boolean retryForException(Throwable ex) {
    return retryableClassifier.classify(ex);
}

Поскольку вы указываете false для тебя EventExceptionне будет retryable, Отсюда любые попытки и откаты.

Другие вопросы по тегам