тестирование kafka и spark с помощью testcontainers

Я пытаюсь проверить с помощью testcontainers потоковый конвейер в качестве интеграционного теста, но я не знаю, как получить bootstrapServers, по крайней мере, в последней версии testcontainers и создать там конкретную тему. Как я могу использовать containerDef?

      import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
  //val kafkaContainer: KafkaContainer      = KafkaContainer("confluentinc/cp-kafka:5.4.3")
  override val containerDef: ContainerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    val sparkSession: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Unit testing")
      .getOrCreate()

    // I will inject some messages into kafka topic

    val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", container.bootstrapServers)
      .option("subscribe", "topic1")
      .load()


    df.show(false)

    // I will process this df in order to check a specific transformation
  })

}

Моя конфигурация sbt:

      lazy val root = project
  .in(file("./pipeline"))
  .settings(
    organization := "org.example",
    name := "spark-stream",
    version := "0.1",
    scalaVersion := "2.12.10",
    libraryDependencies := Seq(
      "org.apache.spark" %% "spark-sql-kafka-0-10"       % "3.0.3"  % Compile,
      "org.apache.spark" %% "spark-sql"                  % "3.0.3"  % Compile,
      "com.dimafeng"     %% "testcontainers-scala-munit" % "0.39.5" % Test,
      "org.dimafeng"     %% "testcontainers-scala-kafka" % "0.39.5" % Test,
      "org.scalameta"    %% "munit"                      % "0.7.28" % Test
    ),
    testFrameworks += new TestFramework("munit.Framework"),
    Test / fork := true
  )

В документации нет полного примера: https://www.testcontainers.org/modules/kafka/

1 ответ

Единственная проблема здесь в том, что вы явно передаете это KafkaContainer.Def к .

Тип контейнера, предоставляемый withContianers, Containter решено path dependent type в при условии,

      trait TestContainerForAll extends TestContainersForAll { self: Suite =>

  val containerDef: ContainerDef

  final override type Containers = containerDef.Container

  override def startContainers(): containerDef.Container = {
    containerDef.start()
  }

  // inherited from TestContainersSuite
  def withContainers[A](runTest: Containers => A): A = {
    val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
    runTest(c)
  }

}
      trait ContainerDef {

  type Container <: Startable with Stoppable

  protected def createContainer(): Container

  def start(): Container = {
    val container = createContainer()
    container.start()
    container
  }
}

В тот момент, когда вы явно укажете тип в override val containerDef: ContainerDef = KafkaContainer.Def(), это разрушает весь "обман типов", и поэтому компилятор Scala остается с type Container <: Startable with Stoppable вместо KafkaContainer.

Итак, просто удалите это явное приведение типа к ContainerDef, и это val servers = container.bootstrapServers будет работать как положено.

      class Mykafkatest extends FunSuite with TestContainerForAll {
  override val containerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    //...

    val servers = container.bootstrapServers

    //...
  })
}
Другие вопросы по тегам