Тестирование @KafkaListener с использованием Spring Embedded Kafka
Я пытаюсь написать модульный тест для слушателя Kafka, который я разрабатываю с помощью Spring Boot 2.x. Будучи модульным тестом, я не хочу запускать полноценный сервер Kafka с экземпляром Zookeeper. Итак, я решил использовать Spring Embedded Kafka.
Определение моего слушателя очень простое.
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
Также тест, который проверяет latch
Счетчик, равный нулю после получения сообщения, очень прост.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
К сожалению, тест не пройден, и я не могу понять, почему. Можно ли использовать экземпляр KafkaEmbedded
протестировать метод, помеченный аннотацией @KafkaListener
?
Весь код доступен в моем репозитории GitHub kafka-listener.
Спасибо всем.
2 ответа
Возможно, вы отправляете сообщение до того, как получателю будет назначена тема / раздел. Установить свойство...
spring:
kafka:
consumer:
auto-offset-reset: earliest
... по умолчанию latest
,
Это похоже на использование --from-beginning
с консолью потребителя.
РЕДАКТИРОВАТЬ
Ой; вы не используете свойства загрузки.
добавлять
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
Кстати, вы, вероятно, также должны сделать get(10L, TimeUnit.SECONDS)
на результат template.send()
(а Future<>
) утверждать, что отправка прошла успешно.
EDIT3
Чтобы переопределить сброс смещения только для теста, вы можете сделать то же самое, что вы сделали для адресов брокера:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
а также
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
Однако имейте в виду, что это свойство применяется только при первом использовании группы. Чтобы всегда запускаться в конце при каждом запуске приложения, вы должны стремиться к концу во время запуска.
Также я бы порекомендовал настройку enable.auto.commit
в false
так что контейнер заботится о фиксации смещений, а не просто полагается на то, что клиент-клиент делает это по расписанию.
Может быть, кто-то найдет это полезным. У меня была похожая проблема. Локально запускались тесты (некоторые проверки выполнялись внутри
Решение, как уже упоминалось в ответе с наибольшим количеством голосов, заключалось в установке