Юнит-тестирование кафки потребителя актером с акка-стрим-кафкой

Я пытаюсь провести модульное тестирование актера, который использует сообщения от kafka, используя абстракцию akka streams kafka.

У моего актера есть два конструктора: основной, который используется в тестировании, и дополнительный, который используется в производстве.

class LastDeviceMetricActor(val source: Source[ConsumerRecord[String, Array[Byte]], Control],
                            implicit val materializer: ActorMaterializer) extends Actor {

  def this(consumerSettings: ConsumerSettings[String, Array[Byte]], materializer: ActorMaterializer) {
    this(Consumer.plainSource(consumerSettings, Subscriptions.topics(Set("deviceMetrics"))), materializer)
  }

В производстве я использую это, передавая "настоящие" ConsumerSettings.

val consumerSettings: ConsumerSettings[String, Array[Byte]] = ???
val latestDeviceMetricActor = actorSystem.actorOf(Props(new LastDeviceMetricActor(consumerSettings, actorMaterializer)))

В тестировании я хочу использовать это так:

 val testSource: Source[ConsumerRecord[String, Array[Byte]], NotUsed] =
        Source.single(new ConsumerRecord("deviceMetrics", 1, 1, "key", metrics.toByteArray))

 val actor = system.actorOf(Props(new LastDeviceMetricActor(testSource, ActorMaterializer())))

Проблема в том, что материализованная стоимость, созданная из Source.single является NotUsed вместо кафки Control тип.

У меня вопрос, как бы я проверил своего актера, не создавая брокера Кафки? Я хотел бы "просто" создать источник, который будет выглядеть как источник потоков kafka в моем тесте, и иметь возможность вставлять в него сообщения.

Нужно ли мне создавать свой собственный источник для этого или есть более простой способ?

0 ответов

Другие вопросы по тегам