Исключительная структурированная потоковая передача ForeachWriter не может получить sparkContext

Я читаю данные JSON из очереди Kafka с использованием структурированной потоковой передачи Spark, но мне нужно записать данные JSON в Elasticsearch.

Тем не менее, я не могу получить sparkContext внутри ForeachWriter конвертировать JSON в RDD. Это бросает NPE.

Как я могу получить SparkContext в Writer конвертировать JSON в RDD?

2 ответа

Ты не можешь. Методы в ForeachWriter беги в исполнителях. Вы можете либо написать приемник Elasticsearch самостоятельно, либо вызвать необработанные API-интерфейсы Elasticsearch для записи данных.

Я решил проблему, получив экземпляр SparkContext внутри ForeachWriter

val writer = new ForeachWriter[CustomerData] {

  override def open(partitionId: Long, version: Long) = true
  override def process(value: CustomerData) = {
        val spark = SparkSession
        .builder()
       .getOrCreate()    //this works
       ...
  }
  override def close(errorOrNull: Throwable) = {}
}

PS: это, вероятно, создание новой SparkSession

Другие вопросы по тегам