Прекращение потоковой передачи искры после чтения первой партии данных
Я использую потоковую передачу искры, чтобы потреблять сообщения Кафки. Я хочу получить некоторые сообщения в качестве образца от Кафки, а не читать все сообщения. Поэтому я хочу прочитать пакет сообщений, вернуть их вызывающей стороне и остановить потоковое воспроизведение. В настоящее время я передаю время batchInterval в методе awaitTermination метода контекста потоковой искры. Сейчас я не знаю, как вернуть обработанные данные вызывающей стороне из потоковой передачи. Вот мой код, который я использую в настоящее время
def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String, String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
streams.foreach(rdd => rdd.foreachPartition(itr => {
while (itr.hasNext && size >= 0) {
var msg=itr.next
println(msg)
sample.append(msg)
sample.append("\n")
size -= 1
}
}))
ssc.start()
ssc.awaitTermination(5000)
ssc.stop(true)
}
Поэтому вместо сохранения сообщений в String Builder под названием "sample" я хочу вернуться к вызывающей стороне.
2 ответа
Мы можем получить образцы сообщений, используя следующий фрагмент кода
var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10))
и если мы хотим остановить после первого пакета, мы должны реализовать наш собственный интерфейс StreamingListener и прекратить потоковую передачу в методе onBatchCompleted.
Вы можете реализовать StreamingListener, а затем внутри него, onBatchCompleted, вы можете вызвать ssc.stop()
private class MyJobListener(ssc: StreamingContext) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
ssc.stop(true)
}
}
Вот как вы присоединяете свой SparkStreaming к JobListener:
val listen = new MyJobListener(ssc)
ssc.addStreamingListener(listen)
ssc.start()
ssc.awaitTermination()