Весенний облачный поток kafka 2.0 - StreamListener с условием
Я пытаюсь создать потребителя, используя аннотацию StreamListener и условие attirbute. Однако я получаю следующее исключение:
org.springframework.core.convert.ConversionFailedException: не удалось преобразовать тип [java.lang.String] в тип [java.lang.Integer] для значения 'test'; Вложенное исключение - java.lang.NumberFormatException: для входной строки: "test"
TestListener:
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="payload['test'] == 'test'")
public void test(@Payload TestObj message) {
log.info("message is {}",message.getName());
}
TestObj:
@Data
@ToString(callSuper=true)
public class TestObj {
@JsonProperty("test")
private String test;
@JsonProperty("name")
private String name;
}
кто-нибудь может помочь с этим вопросом?
2 ответа
Полезная нагрузка сообщения еще не преобразована из проводного формата (byte[]) в нужный тип. Другими словами, он еще не прошел процесс преобразования типов, описанный в согласовании типов содержимого.
Поэтому, если вы не используете выражение SPeL, которое оценивает необработанные данные (например, значение первого байта в массиве байтов), используйте выражения на основе заголовка сообщения (например, условие = "заголовки ['тип']=='собака "").
Пример:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
// handle the message
}
Из того, что вы показываете, это должно работать. Я предлагаю вам удалить условие, а затем установить точку останова для отладки. Тогда вы должны быть в состоянии узнать, какой именно тип.