BackOffPolicy и SimpleRetryPolicy не действуют при вводе в RetryTemplate
Я использую Spring AMQP для отправки сообщений и возможности повторных попыток для "пользовательского" исключения. Допустим, у меня есть Receiver, который выдает пользовательское исключение "EventException", и для этого я хочу, чтобы было n попыток (в нашем примере 5). Между повторными попытками я также хочу, чтобы задержка составляла 5 секунд. Вот мой исходный код:
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
final static String queueName = "testing-queue";
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead-letter-exchange");
Queue queue = new Queue(queueName, true, false, false, arguments);
return queue;
}
@Bean
TopicExchange exchange() {
return new TopicExchange("testing-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
Queue deadLetterQueue() {
return new Queue("dead-letter-queue", true);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange("dead-letter-exchange");
}
@Bean
Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
}
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
SimpleMessageListenerContainer container(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
RetryOperationsInterceptor interceptor) {
Advice[] adviceChain = { interceptor };
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setAdviceChain(adviceChain);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
MessageListenerAdapter adapter =
new MessageListenerAdapter(receiver, "receiveMessage");
return adapter;
}
@Bean
RetryOperations retryTemplate() {
Map<Class<? extends Throwable>, Boolean> retryableExceptions =
new HashMap<Class<? extends Throwable>, Boolean>();
retryableExceptions.put(EventException.class, false);
FixedBackOffPolicy backoffPolicy = new FixedBackOffPolicy();
backoffPolicy.setBackOffPeriod(5000);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backoffPolicy);
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5, retryableExceptions));
return retryTemplate;
}
@Bean
RetryOperationsInterceptor interceptor(RetryOperations retryTemplate) {
RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
interceptor.setRecoverer(new CustomMessageRecover());
interceptor.setRetryOperations(retryTemplate);
return interceptor;
// return RetryInterceptorBuilder
// .stateless()
// //.retryOperations(retryTemplate)
// .maxAttempts(5)
// .recoverer(new CustomMessageRecover()).build();
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
context.close();
}
public class Receiver {
public void receiveMessage(String message) throws Exception {
System.out.println("!!!!!!!!Message has been recieved!!!!!!");
throw new EventException("TESTING");
}
}
public class CustomMessageRecover implements MethodInvocationRecoverer<Void> {
@Override
public Void recover(Object[] args, Throwable cause) {
System.out.println("IN THE RECOVER ZONE!!!");
throw new AmqpRejectAndDontRequeueException(cause);
}
}
class EventException extends Exception {
private static final long serialVersionUID = 1L;
public EventException() {}
public EventException(String message) {
super(message);
}
}
}
Теперь в коде, как вы можете видеть, я использую RetryOperations Interceptor для перехвата и проверки, чтобы увидеть, какой тип исключения он генерирует и основывается на этом, принять решение сделать повторную попытку или нет, вместе с задержкой между повторы
Для этого я устанавливаю backoffPolicy и retryPolicy bean- компонента RetryTemplate и добавляю его в RetryOperationsInterceptor.
Я был бы признателен, если бы кто-нибудь мог мне помочь и сказать, почему повторные попытки и задержка между повторными попытками не работают Мои сообщения направляются непосредственно на обмен мертвыми письмами без повторов и задержек.
БЛАГОДАРЮ ВАС!
1 ответ
Ваша проблема здесь:
retryableExceptions.put(EventException.class, false);
Пожалуйста, найдите SimpleRetryPolicy
код:
public boolean canRetry(RetryContext context) {
Throwable t = context.getLastThrowable();
return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;
}
и далее:
private boolean retryForException(Throwable ex) {
return retryableClassifier.classify(ex);
}
Поскольку вы указываете false
для тебя EventException
не будет retryable
, Отсюда любые попытки и откаты.