Исключительная структурированная потоковая передача ForeachWriter не может получить sparkContext
Я читаю данные JSON из очереди Kafka с использованием структурированной потоковой передачи Spark, но мне нужно записать данные JSON в Elasticsearch.
Тем не менее, я не могу получить sparkContext
внутри ForeachWriter
конвертировать JSON в RDD. Это бросает NPE.
Как я могу получить SparkContext
в Writer конвертировать JSON в RDD?
2 ответа
Ты не можешь. Методы в ForeachWriter
беги в исполнителях. Вы можете либо написать приемник Elasticsearch самостоятельно, либо вызвать необработанные API-интерфейсы Elasticsearch для записи данных.
Я решил проблему, получив экземпляр SparkContext внутри ForeachWriter
val writer = new ForeachWriter[CustomerData] {
override def open(partitionId: Long, version: Long) = true
override def process(value: CustomerData) = {
val spark = SparkSession
.builder()
.getOrCreate() //this works
...
}
override def close(errorOrNull: Throwable) = {}
}
PS: это, вероятно, создание новой SparkSession