при использовании @StreamListener настройка KafkaListenerContainerFactory отражается в сгенерированном KafkaMessageListenerContainer?

Я использую spring-cloud-stream со связывателем kafka для получения сообщения от kafka . Приложение в основном использует сообщения кафки и обновляет базу данных.

Существуют сценарии, когда БД не работает (что может длиться несколько часов) или некоторые другие временные технические проблемы. Поскольку в этих сценариях нет смысла повторять попытку сообщения в течение ограниченного периода времени, а затем перемещать его в DLQ, я пытаюсь добиться бесконечного количества повторных попыток, когда мы получаем определенный тип исключений (например, DBHostNotAvaialableException)

Чтобы достичь этого, я попробовал 2 подхода (сталкиваясь с проблемами в обоих подходах) -

  1. В этом подходе попытался установить обработчик ошибок в свойствах контейнера при настройке bean-компонента ConcurrentKafkaListenerContainerFactory, но обработчик ошибок вообще не запускается. Во время отладки потока, который я понял в созданном KafkaMessageListenerContainer, поле errorHandler имеет значение null, поэтому они используют LoggingErrorHandler по умолчанию. Ниже приведены мои конфигурации bean-компонентов фабрики контейнеров - метод @StreamListener для этого подхода такой же, как и второй подход, за исключением поиска по потребителю.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> 
     kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        ContainerProperties containerProperties = factory.getContainerProperties();
         // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
    

Мне что-то не хватает при настройке фабричного bean-компонента, или этот bean-компонент актуален только для @KafkaListener, а не для @StreamListener??

  1. Вторая альтернатива заключалась в попытке добиться этого с помощью ручного подтверждения и поиска. Внутри метода @StreamListener, получающего подтверждение и потребителя из заголовков, в случае получения повторного исключения я делаю определенное количество повторных попыток с использованием retrytemplate, и когда они исчерпаны, я запускаю consumer.seek(). Пример кода ниже -

    @StreamListener(MySink.INPUT)
    public void processInput(Message<String> msg) {
    
    MessageHeaders msgHeaders = msg.getHeaders();
    Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class);
    Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
    String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
    Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class);
    
    
    try {
      retryTemplate.execute(
                context -> {
                 // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException
                    consumeMessage(msg.getPayload());
                    return null;
                }
        );
    }
    catch (DBHostNotAvaialableException ex) {
      // once retries as per retrytemplate are  exhausted do a seek
    
        consumer.seek(new TopicPartition(topicName, partition), offset);
    
    }
    catch (Exception ex) {
      // if some other exception just log and put in dlq based on enableDlq property
        logger.warn("some other business exception hence putting in dlq ");
        throw ex;
    }
    
    if (ack != null) {
        ack.acknowledge();
    }
    

    }

Проблема с этим подходом - поскольку я использую consumer.seek(), в то время как могут быть ожидающие записи из последнего опроса, они могут быть обработаны и зафиксированы, если БД появится в этот период (следовательно, не в порядке). Есть ли способ очистить эти записи во время поиска?

PS - в настоящее время мы находимся в версии 2.0.3.RELEASE весенней загрузки и Finchley.RELEASE или облачных зависимостей Spring (следовательно, мы не можем использовать такие функции, как отрицательное подтверждение, и обновление невозможно в данный момент).

1 ответ

Spring Cloud Stream не использует фабрику контейнеров. Я уже объяснил вам это в этом ответе.

Версия 2.1 представила ListenerContainerCustomizer и если вы добавите bean-компонент этого типа, он будет вызываться после создания контейнера.

Срок службы Spring Boot 2.0 закончился более года назад, и он больше не поддерживается.

Ответ, который я вам передал, показывает, как можно использовать отражение для добавления обработчика ошибок.

Поиск в слушателе будет работать, только если у вас есть max.poll.records=1.

Другие вопросы по тегам