при использовании @StreamListener настройка KafkaListenerContainerFactory отражается в сгенерированном KafkaMessageListenerContainer?
Я использую spring-cloud-stream со связывателем kafka для получения сообщения от kafka . Приложение в основном использует сообщения кафки и обновляет базу данных.
Существуют сценарии, когда БД не работает (что может длиться несколько часов) или некоторые другие временные технические проблемы. Поскольку в этих сценариях нет смысла повторять попытку сообщения в течение ограниченного периода времени, а затем перемещать его в DLQ, я пытаюсь добиться бесконечного количества повторных попыток, когда мы получаем определенный тип исключений (например, DBHostNotAvaialableException)
Чтобы достичь этого, я попробовал 2 подхода (сталкиваясь с проблемами в обоих подходах) -
В этом подходе попытался установить обработчик ошибок в свойствах контейнера при настройке bean-компонента ConcurrentKafkaListenerContainerFactory, но обработчик ошибок вообще не запускается. Во время отладки потока, который я понял в созданном KafkaMessageListenerContainer, поле errorHandler имеет значение null, поэтому они используют LoggingErrorHandler по умолчанию. Ниже приведены мои конфигурации bean-компонентов фабрики контейнеров - метод @StreamListener для этого подхода такой же, как и второй подход, за исключением поиска по потребителю.
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(kafkaConsumerFactory); factory.getContainerProperties().setAckOnError(false); ContainerProperties containerProperties = factory.getContainerProperties(); // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
Мне что-то не хватает при настройке фабричного bean-компонента, или этот bean-компонент актуален только для @KafkaListener, а не для @StreamListener??
Вторая альтернатива заключалась в попытке добиться этого с помощью ручного подтверждения и поиска. Внутри метода @StreamListener, получающего подтверждение и потребителя из заголовков, в случае получения повторного исключения я делаю определенное количество повторных попыток с использованием retrytemplate, и когда они исчерпаны, я запускаю
consumer.seek()
. Пример кода ниже -@StreamListener(MySink.INPUT) public void processInput(Message<String> msg) { MessageHeaders msgHeaders = msg.getHeaders(); Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class); Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class); String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class); Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class); try { retryTemplate.execute( context -> { // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException consumeMessage(msg.getPayload()); return null; } ); } catch (DBHostNotAvaialableException ex) { // once retries as per retrytemplate are exhausted do a seek consumer.seek(new TopicPartition(topicName, partition), offset); } catch (Exception ex) { // if some other exception just log and put in dlq based on enableDlq property logger.warn("some other business exception hence putting in dlq "); throw ex; } if (ack != null) { ack.acknowledge(); }
}
Проблема с этим подходом - поскольку я использую consumer.seek(), в то время как могут быть ожидающие записи из последнего опроса, они могут быть обработаны и зафиксированы, если БД появится в этот период (следовательно, не в порядке). Есть ли способ очистить эти записи во время поиска?
PS - в настоящее время мы находимся в версии 2.0.3.RELEASE весенней загрузки и Finchley.RELEASE или облачных зависимостей Spring (следовательно, мы не можем использовать такие функции, как отрицательное подтверждение, и обновление невозможно в данный момент).
1 ответ
Spring Cloud Stream не использует фабрику контейнеров. Я уже объяснил вам это в этом ответе.
Версия 2.1 представила ListenerContainerCustomizer
и если вы добавите bean-компонент этого типа, он будет вызываться после создания контейнера.
Срок службы Spring Boot 2.0 закончился более года назад, и он больше не поддерживается.
Ответ, который я вам передал, показывает, как можно использовать отражение для добавления обработчика ошибок.
Поиск в слушателе будет работать, только если у вас есть max.poll.records=1
.