Как протестировать Kafka Consumer с Embedded-Kafka-lib, именно с 'withRunningKafka'?

Я должен проверить мой код на предмет потребления всех сообщений от kafka-сервера через встроенный метод withRunningKafka, как показано здесь: https://github.com/manub/scalatest-embedded-kafka

  1. Я пытался отправить сообщение в тему через созданного встроенного производителя.
  2. И я пытался использовать созданные сообщения (созданные встроенным производителем) через мой код в проекте.

"тестирование с изготовителем на заказ и потребителем" должно {

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

Проблема здесь: "ОК" не дает результат как "Готово". Вообще, правильна ли моя логика для проверки потребителя?

2 ответа

Добро пожаловать в stackru!

Причина ok никогда не завершается с результатом, потому что источник ждет возможных дальнейших сообщений. добавлять .take(2) перед картой, и источник остановится после того, как два элемента ok Будущее будет завершено.

Я думаю, что вы сталкиваетесь с двумя проблемами одновременно:

  1. Потребитель Kafka бесконечно ждет элементов (как говорит @dvim), поэтому вам нужно.take(), чтобы он действительно завершился

  2. Группа потребителей kafka по умолчанию будет начинаться в конце текущей темы, а не в начале, и поэтому не будет потреблять сообщения, опубликованные до ее вращения. Вам нужно в настройках, чтобы это началось в начале темы, а не в конце.

Другие вопросы по тегам