Весенний слушатель теста интеграции Kafka не работает
Я пытаюсь тестирование интеграции с помощью spring-boot
и весной врезал кафку. Я могу создать сообщение для встроенного в Kafka сервера, но вместо тестового слушателя слушатель в классе обслуживания пытается использовать записи и
KafkaProducerConfigTest Класс Config со всеми компонентами
@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean("consumerFactory")
public ConsumerFactory<String, Object> createConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Object.class,false));
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public Listener listener() {
return new Listener();
}
public class Listener {
public final CountDownLatch latch = new CountDownLatch(1);
@Getter
public List<Professor> list;
@KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory")
public void listen1(List<Professor> foo) {
list=foo;
this.latch.countDown();
}
}
}
KafkaProducerServiceTest Тестовый класс службы, но здесь слушатель не потребляет никаких данных, и я знаю, что этот тест не пройден, потому что это неправильный способ сделать это
@EnableKafka
@SpringBootTest(classes = { KafkaProducerConfigTest.class })
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Autowired
private Listener listener;
@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor("Ajay", new Department("social", 1234))));
System.out.println("The professor object is sent to kafka -----------------------------------");
listener.latch.await();
System.out.println(listener.getList());
}
}
ошибка
2019-02-19 15:18:32.620 ERROR 22387 --- [ntainer#1-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
ConsumerRecord(topic = test-events, partition = 1, offset = 0, CreateTime = 1550611112583, serialized key size = 9, serialized value size = 64, headers = RecordHeaders(headers = [], isReadOnly = false), key = professor, value = {name=Ajay, department={deptName=social, deptId=1234}})
java.lang.IllegalStateException: Only String or byte[] supported
at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:140) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:134) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.BatchMessagingMessageConverter.convert(BatchMessagingMessageConverter.java:217) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.BatchMessagingMessageConverter.toMessage(BatchMessagingMessageConverter.java:165) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.toMessagingMessage(BatchMessagingMessageListenerAdapter.java:174) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:129) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:59) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:984) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:917) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:900) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:753) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Обновлено на основе @Gray Russel, сэр, но у меня все еще есть проблема
1 ответ
Использование StringDeserializer
и StringJsonMessageConverter
на контейнерном заводе (или BytesDeserializer
а также BytesJsonMessageConverter
); затем платформа может определить целевой тип из сигнатуры метода и передать его в конвертер.
Десериализатор находится слишком далеко в стеке, чтобы сделать этот вывод.
Возможно, уже поздно отвечать по этому поводу. Как сказал @Gary Russell в своем ответе, просто используйте StringDeserializer для десериализации объекта.
application.yml для настройки Kafka:
server:
port: 8080
servlet:
contextPath: /poi
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: rsvp-consumers
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring:
json:
type:
mapping: poi:org.ajeet.learnings.spring.springboot.model.POI
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring:
json:
type:
mapping: poi:org.ajeet.learnings.spring.springboot.model.POI
topics: spatial
Класс события Kafka:
public final class POI {
private final double longitude;
private final double latitude;
private final POI_Type type;
private final String description;
@JsonCreator
public POI(@JsonProperty("longitude") double longitude,
@JsonProperty("latitude") double latitude,
@JsonProperty("type") POI_Type type,
@JsonProperty("description") String description) {
this.longitude = longitude;
this.latitude = latitude;
this.type = type;
this.description = description;
}
public double getLongitude() {
return longitude;
}
public double getLatitude() {
return latitude;
}
public String getDescription() {
return description;
}}
В вашем регистре класса приложенияStringJsonMessageConverter (вы также можете создать этот bean-компонент где-нибудь еще), он преобразует строку в требуемый объект, который является POI в этом:
@SpringBootApplication
@ComponentScan("org.ajeet.learnings.spring")
public class SpringBootKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
}
/**
* We are using a StringDeserializer in kafka consumer properties
* And this converter is converting the message from string to required type.
*
* @return RecordMessageConverter
*/
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}}
Это потребитель:
@Service
public final class POIEventConsumer {
private final Logger LOG = LoggerFactory.getLogger(POIEventConsumer.class);
/**
* Read topic names from application.yaml
*
* @param pointOfInterest
* @throws IOException
*/
@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}")
public void consume(POI pointOfInterest) throws IOException {
LOG.info(pointOfInterest.toString());
} }
Это производитель:
@Service
public class POIEventProducer {
private static final Logger LOG = LoggerFactory.getLogger(POIEventProducer.class);
@Value("${spring.kafka.topics}")
private String topic;
@Autowired
private KafkaTemplate<String, POI> kafkaTemplate;
public void sendMessage(POI pointOfInterest) {
this.kafkaTemplate.send(topic, pointOfInterest);
}}
Вы можете использовать контроллер для проверки:
@RestController
@RequestMapping(value = "/kafka")
public final class POIEventController {
private final Logger LOG = LoggerFactory.getLogger(POIEventController.class);
@Autowired
private POIEventProducer poiEventProducer;
/**
* Post request url: http://localhost:8080/poi/kafka/publish
* Example of request body: {"longitude":77.100281, "latitude": 28.556160, "type": "Airport", "description": "Indira Gandhi International Airport New Delhi"}
*
* @param pointOfInterest
*/
@RequestMapping(value = "/publish",
method = RequestMethod.POST,
consumes = MediaType.APPLICATION_JSON_VALUE)
public void sendMessageToKafkaTopic(@RequestBody POI pointOfInterest) {
this.poiEventProducer.sendMessage(pointOfInterest);
}}