Spring Cloud Stream Elmhrust.RELEASE не может изменить Serde
Я не могу изменить Serde канала (или привязку), используя синтаксис, указанный в документации ( https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/).
Предполагая, что мой канал pcin
, Я понимаю, что я должен указать valueSerde и keySerde, используя следующие свойства spring.cloud.stream.kafka.streams.bindings.pcin.producer.valueSerde
а также spring.cloud.stream.kafka.streams.bindings.pcin.producer.keySerde
,
Однако я получаю исключение:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Я пытаюсь адаптировать пример из Spring Tips Джоша Лонга: https://github.com/spring-tips/spring-cloud-stream-kafka-streams
Я просто поменял класс PageViewEventProcessor
следующее:
@Component
public static class PageViewEventProcessor {
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey()
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
.toStream();
}
}
Вместо подсчета количества событий (посещений страницы) я вычисляю сумму продолжительности каждого посещения.
Вот выдержка из application.properties (из примера Spring tips):
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
Есть ли другие необходимые изменения?
1 ответ
Является pcin
связаны с потребителем (вход)? Если это так, вы должны использовать свойства как
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde
and
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde
Ваш тип входящего значения PageViewEvent
, Тем не менее, вы устанавливаете значение Serde
быть LongSerde
,
Вы можете полностью удалить это свойство: spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
и пусть фреймворк сделает преобразование JSON за вас. Таким образом, входящий тип автоматически преобразуется в PageViewEvent
без вас явно укажите значение Serde.
Если вы должны предоставить значение Serde (в этом случае свойство native-decoding должно быть установлено на true
), тогда вы должны предоставить правильный JsonSerde в качестве значения Serde
,
Обновление:
Со следующими изменениями я могу запустить приложение без каких-либо ошибок.
Я изменил твой код следующим образом.
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
)
.toStream();
}
Внутренние сердца на groupByKey
а также aggregate
вызовы необходимы, поскольку они отличаются от комбинации Serde ключ / значение по умолчанию.
Я также изменил ваш конфиг и очистил его:
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde