Spring Cloud Stream Elmhrust.RELEASE не может изменить Serde

Я не могу изменить Serde канала (или привязку), используя синтаксис, указанный в документации ( https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/).

Предполагая, что мой канал pcin, Я понимаю, что я должен указать valueSerde и keySerde, используя следующие свойства spring.cloud.stream.kafka.streams.bindings.pcin.producer.valueSerde а также spring.cloud.stream.kafka.streams.bindings.pcin.producer.keySerde,

Однако я получаю исключение:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Я пытаюсь адаптировать пример из Spring Tips Джоша Лонга: https://github.com/spring-tips/spring-cloud-stream-kafka-streams

Я просто поменял класс PageViewEventProcessor следующее:

@Component
        public static class PageViewEventProcessor {

                @StreamListener
                @SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
                public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
                        return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey()
                            .aggregate(()-> 0L, 
                                    (cle, val, valAgregee) -> valAgregee + val, 
                                    Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))

                            .toStream();
                }
        }

Вместо подсчета количества событий (посещений страницы) я вычисляю сумму продолжительности каждого посещения.

Вот выдержка из application.properties (из примера Spring tips):

# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde

Есть ли другие необходимые изменения?

1 ответ

Решение

Является pcin связаны с потребителем (вход)? Если это так, вы должны использовать свойства как spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde and spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde

Ваш тип входящего значения PageViewEvent, Тем не менее, вы устанавливаете значение Serde быть LongSerde,

Вы можете полностью удалить это свойство: spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true и пусть фреймворк сделает преобразование JSON за вас. Таким образом, входящий тип автоматически преобразуется в PageViewEvent без вас явно укажите значение Serde.

Если вы должны предоставить значение Serde (в этом случае свойство native-decoding должно быть установлено на true), тогда вы должны предоставить правильный JsonSerde в качестве значения Serde,

Обновление:

Со следующими изменениями я могу запустить приложение без каких-либо ошибок.

Я изменил твой код следующим образом.

@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {

                    return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
                            .aggregate(()-> 0L,
                                    (cle, val, valAgregee) -> valAgregee + val,
                                    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
                                    .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
                            )
                            .toStream();
                }

Внутренние сердца на groupByKey а также aggregate вызовы необходимы, поскольку они отличаются от комбинации Serde ключ / значение по умолчанию.

Я также изменил ваш конфиг и очистил его:

#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
Другие вопросы по тегам