Попытка подключения потребителя весной amqp кролик макс
Я пытаюсь установить максимальное количество попыток от моего приложения к брокеру кроликов. У меня есть перехватчик повторов,
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(CommonConstants.MAX_AMQP_RETRIES)
.backOffOptions(500, 2.0, 3000)
.build();
}
и это используется внутри контейнера слушателя,
container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
Однако после нескольких попыток потребитель снова и снова пытается соединиться бесконечно,
2017-02-21 15:03:12.229 WARN 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2017-02-21 15:03:12.229 INFO 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2017-02-21 15:03:13.245 WARN 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2017-02-21 15:03:13.245 INFO 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2017-02-21 15:03:13.261 ERROR 9292 --- [nsumerThread_83] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
Я хочу, чтобы приложение не работало и выдает ошибку из-за отсутствия подключения к брокеру после ограничения MAX_RETRY #.
Спасибо за помощь
EDIT
Как подсказывает @artem-bilan, я использовал Component
public class BrokerFailureEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent>
В этом классе onApplicationEvent
Я посчитал количество сбоев и затем предпринял соответствующие действия.
В случае со стороны производителя, это немного другой сценарий. Как объяснил @artem-bilan, приложение должно позаботиться о любых проблемах. Я исследовал с помощью netflix-hystrix
и добавил fallback
метод для метода производства и будет идти по этому маршруту. Большое спасибо еще раз.
1 ответ
Ну, вы немного не поняли container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
, Это для бизнес-ошибок при обработке сообщений:
Обработка бизнес-исключений, в отличие от ошибок протокола и разорванных соединений, может потребовать больше обдумывания и некоторой пользовательской конфигурации, особенно если используются транзакции и / или контейнерные подтверждения. До версии 2.8.x RabbitMQ не имел определения поведения недоставленных букв, поэтому по умолчанию сообщение, которое отклонено или откатано из-за бизнес-исключения, может быть доставлено до бесконечности. Чтобы установить в клиенте ограничение на количество повторных доставок, одним из вариантов является StatefulRetryOperationsInterceptor в цепочке рекомендаций слушателя. Перехватчик может иметь функцию обратного вызова для восстановления, которая реализует настраиваемое действие недоставленной буквы: все, что подходит для вашей конкретной среды.
В противоречие с:
На самом деле, он бесконечно зацикливается на попытках перезапустить потребителя, и только если он ведет себя очень плохо, он действительно сдастся. Одним из побочных эффектов является то, что если при запуске контейнера посредник не работает, он просто будет пытаться установить соединение.
Что вам нужно ListenerContainerConsumerFailedEvent
, который испускается как:
private void logConsumerException(Throwable t) {
if (logger.isDebugEnabled()
|| !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) {
logger.warn(
"Consumer raised exception, processing can restart if the connection factory supports it",
t);
}
else {
logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
}
Таким образом, вы можете прослушивать эти события и останавливать приложение, когда достигается какое-то условие.