Как справиться с ошибкой сериализации в связывателе потоков Spring Cloud Stream Kafka?

Я пишу приложение потоков Kafka, используя связку потоков Kafka Stream Cloud Spring.

Пока потребитель публикует сообщение в теме вывода, может возникнуть ошибка, например ошибка сериализации или сетевая ошибка.

В этом коде -

@Bean
public Function<KStream<Object, String>, KStream<Object, String>> process() {
    return (input) -> {
        KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
        return kt;
    };
}

Здесь при создании сообщения обратно в тему вывода, если возникает ошибка, как с ней справиться. Есть ли какой-либо механизм в связывателе потоков Kafka, кроме RetryTemplate?

1 ответ

См. Документацию Spring для Apache Kafka.

Когда десериализатору не удается десериализовать сообщение, Spring не может решить эту проблему, потому что она возникает до возврата из poll(). Чтобы решить эту проблему, в версии 2.2 появился ErrorHandlingDeserializer2. Этот десериализатор делегирует полномочия реальному десериализатору (ключ или значение). Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer2 возвращает нулевое значение и исключение DeserializationException в заголовке, который содержит причину и необработанные байты. Когда вы используете MessageListener на уровне записи, если ConsumerRecord содержит заголовок DeserializationException для ключа или значения, вызывается ErrorHandler контейнера с ошибкой ConsumerRecord. Запись слушателю не передается.