Получить данные из темы после нажатия на @EmbeddedKafka в весенней загрузке Junit
Я пишу тестовые примеры Junit (используя @EmbeddedKafka) для своего приложения Spring Boot, которое широко использует Spring-kafka для связи с другими сервисами и для других операций.
Один типичный случай - удаление данных из kafka (что мы делаем с отправкой пустого сообщения в kafka).
В настоящее время в методе delete() мы делаем это, сначала проверяя, существует ли какое-либо сообщение в kafka, которое запрашивается для удаления. Затем мы нажимаем ноль для этого ключа сообщения в Кафке
Последовали шаги в написании Junit для вышеуказанной логики метода.
@Test
public void test(){
//Push a message to Kafka (id=1234)
//call test method service.delete(1234);
//internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
//check delete topic for delete message received.
// Assertions
}
Проблема здесь в том, что Кафка всегда выдает сообщение не найденное исключение. внутри метода service.delete().
при проверке логов в консоли. я выяснил, что мой производитель-config использует другой порт для kafka, а потребительский config использует другой порт.
Я не уверен, упустил ли я какие-то мелкие детали или в чем причина такого поведения. Любая помощь будет оценена.
1 ответ
У меня есть простое приложение Spring Boot, которое вы можете рассмотреть:
@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {
public static final String MY_TOPIC = "myTopic";
public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();
public static void main(String[] args) {
SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
}
@KafkaListener(topics = MY_TOPIC)
public void listener(String payload) {
this.kafkaMessages.add(payload);
}
}
application.properties
:
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
И проверить:
@RunWith(SpringRunner.class)
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {
@Autowired
private KafkaTemplate<Object, String> kafkaTemplate;
@Autowired
private SpringBootEmbeddedKafkaApplication kafkaApplication;
@Test
public void testListenerWithEmbeddedKafka() throws InterruptedException {
String testMessage = "foo";
this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);
assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
}
}
Обратите внимание на spring.kafka.consumer.auto-offset-reset=earliest
позволить потребителю читать с начала раздела.
Также еще один важный вариант для применения в тесте:
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka
заселяет spring.embedded.kafka.brokers
Системное свойство и сделайте автоконфигурацию Spring Boot, чтобы узнать о том, что нам нужно скопировать его значение в spring.kafka.bootstrapServers
свойство конфигурации.
Или другой вариант в соответствии с нашими документами:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}