Весенний облачный поток kafka 2.0 - StreamListener с условием

Я пытаюсь создать потребителя, используя аннотацию StreamListener и условие attirbute. Однако я получаю следующее исключение:

org.springframework.core.convert.ConversionFailedException: не удалось преобразовать тип [java.lang.String] в тип [java.lang.Integer] для значения 'test'; Вложенное исключение - java.lang.NumberFormatException: для входной строки: "test"

TestListener:

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="payload['test'] == 'test'")
public void test(@Payload TestObj message) {
    log.info("message is {}",message.getName());
}

TestObj:

@Data
@ToString(callSuper=true)
public class TestObj {

    @JsonProperty("test")
    private String test;

    @JsonProperty("name")
    private String name;

}

кто-нибудь может помочь с этим вопросом?

2 ответа

Полезная нагрузка сообщения еще не преобразована из проводного формата (byte[]) в нужный тип. Другими словами, он еще не прошел процесс преобразования типов, описанный в согласовании типов содержимого.

Поэтому, если вы не используете выражение SPeL, которое оценивает необработанные данные (например, значение первого байта в массиве байтов), используйте выражения на основе заголовка сообщения (например, условие = "заголовки ['тип']=='собака "").

Пример:

 @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

Из того, что вы показываете, это должно работать. Я предлагаю вам удалить условие, а затем установить точку останова для отладки. Тогда вы должны быть в состоянии узнать, какой именно тип.

Другие вопросы по тегам