Spring Kafka Consumer Retry с длительным интервалом отсрочки, дающим "org.apache.kafka.clients.consumer.CommitFailedException"

Я новичок в Spring-Kafka и пытаюсь реализовать повторную попытку в случае сбоя или любого исключения во время обработки сообщений kafka с помощью Spring Kafka RetryTemplate.

Я использовал следующий код:

// Это KafkaListenerContainerFactory:

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        logger.info("Recovery is called for message {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

// Повторная попытка шаблона

public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    // Todo: take from config
    fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    // Todo: take from config
    simpleRetryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
}
    
//

Это потребительская фабрика

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

Когда возникает какое-либо исключение, оно повторяется, как ожидалось, в соответствии с политикой повторных попыток. Как только максимальное количество повторных попыток исчерпания, вызывается методом обратного вызова восстановления. Но вскоре после этого он выдает "java.lang.IllegalStateException: этот обработчик ошибок не может обработать org.apache.kafka.clients.consumer.CommitFailedException; информация о записи недоступна" с некоторыми деталями, например:Неудачный запрос OffsetCommit, поскольку потребитель является не входит в активную группу.

Похоже, что он не может зафиксировать смещение, поскольку потребитель теперь отключен от группы, потому что он простаивал в течение длительного времени (backoffperiod*(maxretry-1)) до следующего опроса.

Нужно ли мне добавлять max.poll.interval.ms с большим значением?

Есть ли какой-либо другой способ добиться этого, чтобы эта ошибка сбоя фиксации не возникла, даже если у потребителя так много времени на обработку и запланировано повторение с длинным интервалом.

Пожалуйста помоги мне с этим.

1 ответ

Решение

Суммарная задержка возврата должна быть меньше, чем max.poll.interval.ms чтобы избежать перебалансировки.

Сейчас предпочтительнее использовать SeekToCurrentErrorHandler вместо RetryTemplate потому что тогда только каждая задержка (а не совокупность) должна быть меньше, чем max.poll.interval.ms

Документация здесь.

Другие вопросы по тегам