Как протестировать Kafka Consumer с Embedded-Kafka-lib, именно с 'withRunningKafka'?
Я должен проверить мой код на предмет потребления всех сообщений от kafka-сервера через встроенный метод withRunningKafka, как показано здесь: https://github.com/manub/scalatest-embedded-kafka
- Я пытался отправить сообщение в тему через созданного встроенного производителя.
- И я пытался использовать созданные сообщения (созданные встроенным производителем) через мой код в проекте.
"тестирование с изготовителем на заказ и потребителем" должно {
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
Проблема здесь: "ОК" не дает результат как "Готово". Вообще, правильна ли моя логика для проверки потребителя?
2 ответа
Добро пожаловать в stackru!
Причина ok
никогда не завершается с результатом, потому что источник ждет возможных дальнейших сообщений. добавлять .take(2)
перед картой, и источник остановится после того, как два элемента ok
Будущее будет завершено.
Я думаю, что вы сталкиваетесь с двумя проблемами одновременно:
Потребитель Kafka бесконечно ждет элементов (как говорит @dvim), поэтому вам нужно.take(), чтобы он действительно завершился
Группа потребителей kafka по умолчанию будет начинаться в конце текущей темы, а не в начале, и поэтому не будет потреблять сообщения, опубликованные до ее вращения. Вам нужно в настройках, чтобы это началось в начале темы, а не в конце.