Искра QueueStream никогда не исчерпывается
Озадаченный частью кода, которую я позаимствовал из Интернета для исследовательских целей. Это код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
val spark = ...
val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val rddQueue = new mutable.Queue[RDD[Char]]()
val QS = ssc.queueStream(rddQueue)
QS.foreachRDD(q=> {
print("Hello") // Queue never exhausted
if(!q.isEmpty) {
... do something
... do something
}
}
)
//ssc.checkpoint("/chkpoint/dir") if unchecked causes Serialization error
ssc.start()
for (c <- 'a' to 'c') {
rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()
Я просматривал его, чтобы проверить и заметил, что "привет" печатается вечно:
HelloHelloHelloHelloHelloHelloHelloHelloHelloHello and so on
Я думал бы, что queueStream исчерпает после 3 итераций.
Итак, что я пропустил?
1 ответ
Решение
Понял. На самом деле он исчерпан, но цикл продолжается, и поэтому утверждение
if(!q.isEmpty)
есть.
Хорошо, подумал бы, что это просто остановит, или, скорее, не выполнит, но не так. Я вспомнил. Пустой СДР будет получен, если ничего не передано, в зависимости от времени интервала пакета. Оставив для других, как было upvote.
Однако, несмотря на то, что он устаревший, это плохой пример, так как добавление контрольной точки вызывает ошибку сериализации. Оставляя это на благо других.
ssc.checkpoint("/chkpoint/dir")