Как использовать режим вывода обновлений в формате FileFormat?

Я пытаюсь использовать искровую структурированную потоковую передачу в режиме вывода обновлений и записи в файл. Я нашел этот пример StructuredSessionization, и он отлично работает, пока настроен формат консоли. Но если я изменю режим вывода на:

 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("json")
  .option("path", "/work/output/data")
  .option("checkpointLocation", "/work/output/checkpoint")
  .start()

Я получаю следующую ошибку:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Data source json does not support Update output mode;
        at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:279)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
        at palyground.StructuredStreamingMergeSpans$.main(StructuredStreamingMergeSpans.scala:84)
        at palyground.StructuredStreamingMergeSpans.main(StructuredStreamingMergeSpans.scala)

Могу ли я использовать режим обновления и использовать FileFormat для записи таблицы результатов в приемник файлов? В исходном коде я нашел образец соответствия, который обеспечивает режим добавления.

2 ответа

Вы не можете записывать в файл в режиме обновления с использованием искровой структурированной потоковой передачи. Вам нужно написать ForeachWriter для этого. Я написал просто для каждого писателя здесь. Вы можете изменить его в соответствии с вашими требованиями.

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .foreach(writerForText)
  .start()

Append Режим вывода требуется для любого из FileFormat раковины, в т.ч. json, который Spark Structured Streaming проверяет перед началом потокового запроса.

if (outputMode != OutputMode.Append) {
  throw new AnalysisException(
    s"Data source $className does not support $outputMode output mode")
}

В Spark 2.4 вы могли бы использовать DataStreamWriter.foreach оператор или новый DataStreamWriter.foreachBatch оператор, который просто принимает функцию, которая принимает набор данных пакета и идентификатор пакета.

foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
Другие вопросы по тегам