Spark - не останавливается Spark Stream, который использует тему Kafka
Я пытаюсь написать тест для примера потокового воспроизведения, который использует данные из kafka. Я использую EmbeddedKafka для этого.
implicit val config = EmbeddedKafkaConfig(kafkaPort = 12345)
EmbeddedKafka.start()
EmbeddedKafka.createCustomTopic(topic)
println(s"Kafka Running ${EmbeddedKafka.isRunning}")
val spark = SparkSession.builder.appName("StructuredStreaming").master("local[2]").getOrCreate
import spark.implicits._
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:12345")
.option("subscribe", topic)
.load()
// pushing data to kafka
vfes.foreach(e => {
val json = ...
EmbeddedKafka.publishStringMessageToKafka(topic, json)
})
val query = df.selectExpr("CAST(value AS STRING)")
.as[String]
.writeStream.format("console")
query.start().awaitTermination()
spark.stop()
EmbeddedKafka.stop()
Когда я запускаю это, он продолжает работать и не останавливается и ничего не печатает на консоль. Я не могу понять, почему это так. Я также попытался прекратить кафку, позвонив EmbeddedKafka.stop()
перед звонком stop
в потоке.
0 ответов
Попробуйте установить тайм-аут с помощью
query.start().awaitTermination( 3000)
где 3000 в миллисекундах