Десериализация в Spring Cloud

У меня есть производитель со следующей конфигурацией для типа контента

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
          content-type: application/json

На стороне потребителя я использую пружинную интеграцию (KinesisMessageDrivenChannelAdapter) для маршрутизации события на разные каналы. Когда я получаю Сообщение о классе слушателя, как показано ниже:

@ServiceActivator(inputChannel = "channelA")
    void handleMessage(Message<?> msg) {
        objectMapper.readValue(msg.getPayload(), MyEvent.class);
    }

сортировка в MyEvent не удается. В ошибке стека я вижу, что тип содержимого является частью полезной нагрузки, а полезная нагрузка все еще не десериализована из json в POJO.

Мне интересно, как я могу десериализовать сообщение перед выполнением любого другого преобразования. Я не могу найти какой-либо метод, которым я могу установить MessageConverter для адаптера.

Я ценю вашу помощь.

Спасибо

1 ответ

Решение

Похоже, ваш производитель Spring Cloud Stream, но потребитель просто KinesisMessageDrivenChannelAdapter, Непонятно, почему бы не использовать потребителя Spring Cloud Stream, но в любом случае...

Ваша проблема в том, что производитель SCSt сериализует заголовки сообщений вместе с полезной нагрузкой в ​​теле записи Kinesis. Просто потому, что AWS Kinesis не поддерживает заголовки как таковые.

Если вы действительно не интересуетесь заголовками на стороне потребителя, вы можете отключить встраивание заголовков на стороне производителя:

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
        producer:
        headerMode: none

В противном случае у вас нет выбора на равнине KinesisMessageDrivenChannelAdapter сторона, если вы не используете EmbeddedHeaderUtils вручную.

Другие вопросы по тегам