Юнит-тестирование кафки потребителя актером с акка-стрим-кафкой
Я пытаюсь провести модульное тестирование актера, который использует сообщения от kafka, используя абстракцию akka streams kafka.
У моего актера есть два конструктора: основной, который используется в тестировании, и дополнительный, который используется в производстве.
class LastDeviceMetricActor(val source: Source[ConsumerRecord[String, Array[Byte]], Control],
implicit val materializer: ActorMaterializer) extends Actor {
def this(consumerSettings: ConsumerSettings[String, Array[Byte]], materializer: ActorMaterializer) {
this(Consumer.plainSource(consumerSettings, Subscriptions.topics(Set("deviceMetrics"))), materializer)
}
В производстве я использую это, передавая "настоящие" ConsumerSettings.
val consumerSettings: ConsumerSettings[String, Array[Byte]] = ???
val latestDeviceMetricActor = actorSystem.actorOf(Props(new LastDeviceMetricActor(consumerSettings, actorMaterializer)))
В тестировании я хочу использовать это так:
val testSource: Source[ConsumerRecord[String, Array[Byte]], NotUsed] =
Source.single(new ConsumerRecord("deviceMetrics", 1, 1, "key", metrics.toByteArray))
val actor = system.actorOf(Props(new LastDeviceMetricActor(testSource, ActorMaterializer())))
Проблема в том, что материализованная стоимость, созданная из Source.single
является NotUsed
вместо кафки Control
тип.
У меня вопрос, как бы я проверил своего актера, не создавая брокера Кафки? Я хотел бы "просто" создать источник, который будет выглядеть как источник потоков kafka в моем тесте, и иметь возможность вставлять в него сообщения.
Нужно ли мне создавать свой собственный источник для этого или есть более простой способ?