Как отфильтровать поток данных с помощью операции преобразования и внешнего RDD?
Я использовал transform
метод в аналогичном сценарии использования, как описано в разделе " Операция преобразования " в " Преобразованиях на DStreams":
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
Мой код выглядит следующим образом:
sc = SparkContext("local[4]", "myapp")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2)
filtered_count = counts.transform(
lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2)
)
filtered_count.pprint()
ssc.start()
ssc.awaitTermination()
Но я получаю следующую ошибку
Похоже, что вы пытаетесь передать СДР или сослаться на СДР из действия или преобразования. Преобразования и действия СДР могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(lambda x: rdd2.values.count() * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063.
Как мне использовать мой внешний RDD для фильтрации элементов из dstream?
1 ответ
Разница между примером документа Spark и вашим кодом заключается в использовании ssc.checkpoint().
Хотя конкретный пример кода, который вы предоставили, будет работать без контрольной точки, я думаю, вам это действительно нужно. Но концепция введения внешнего RDD в область контрольного пункта DStream потенциально недействительна: при восстановлении с контрольной точки внешний RDD мог измениться.
Я пытался проверить внешний RDD, но мне тоже не повезло.