Kafka Listener: невозможно предотвратить повторную попытку для определенного исключения с помощью SimpleRetryPolicy

У меня есть слушатель Kafka, который может генерировать исключение JsonProcessingException и другое настраиваемое исключение (допустим, исключение X) . Я хочу, чтобы прослушиватель kafka повторял попытку только при возникновении исключения JsonProcessingException, а не при возникновении исключения X.

Для этого я передал retryTemplate в ConcurrentKafkaListenerContainerFactory. Я использовал SimpleRetryTemplate, чтобы упомянуть об исключениях, которые нужно повторить, но это не работает.

Слушатель также пытается обработать исключение X.

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(){
        LOGGER.debug("Creating ConcurrentKafkaInsertionListner");
        ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consumerFactory());
        listenerFactory.setRetryTemplate(kafkaListenerRetryTemplate());
        listenerFactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer((KafkaOperations<String, String>)kafkaTemplate())));
        return listenerFactory; 
    }

private RetryTemplate kafkaListenerRetryTemplate() {

        LOGGER.debug("Creating KafkaRetry Template");
        RetryTemplate retryTemplate = new RetryTemplate();

      /* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
        retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

        return retryTemplate;
    }

 private SimpleRetryPolicy getSimpleRetryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        LOGGER.debug("Creating Kafka listener retry policy");
        //exceptionMap.put(Exception.class, false);
        //exceptionMap.put(Throwable.class, false);
        exceptionMap.put(JsonProcessingException.class, true);
        exceptionMap.put(ExceptionX.class, false);
        return new SimpleRetryPolicy(3,exceptionMap,true);
    }

Я не уверен, что мне не хватает. Я также попытался установить traverCauses на false и установить Exception.class и Throwable.class в exceptionMap на false.

Я использую весеннюю загрузку версии 2.3.4

1 ответ

Больше нет необходимости использовать RetryTemplate; теперь вы можете добавить BackOff и addNotRetryableException() к SeekToCurrentErrorHandler - фактически наличие обоих означает, что у вас есть вложенные попытки.

Чтобы отключить повторные попытки в STCEH, используйте FixedBackOffс 0 попытками. По умолчанию BackOff это 9 попыток (10 попыток) без возврата.

Другие вопросы по тегам