Как использовать режим вывода обновлений в формате FileFormat?
Я пытаюсь использовать искровую структурированную потоковую передачу в режиме вывода обновлений и записи в файл. Я нашел этот пример StructuredSessionization, и он отлично работает, пока настроен формат консоли. Но если я изменю режим вывода на:
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("json")
.option("path", "/work/output/data")
.option("checkpointLocation", "/work/output/checkpoint")
.start()
Я получаю следующую ошибку:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Data source json does not support Update output mode;
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:279)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
at palyground.StructuredStreamingMergeSpans$.main(StructuredStreamingMergeSpans.scala:84)
at palyground.StructuredStreamingMergeSpans.main(StructuredStreamingMergeSpans.scala)
Могу ли я использовать режим обновления и использовать FileFormat для записи таблицы результатов в приемник файлов? В исходном коде я нашел образец соответствия, который обеспечивает режим добавления.
2 ответа
Вы не можете записывать в файл в режиме обновления с использованием искровой структурированной потоковой передачи. Вам нужно написать ForeachWriter
для этого. Я написал просто для каждого писателя здесь. Вы можете изменить его в соответствии с вашими требованиями.
val writerForText = new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
true
}
}
val query = sessionUpdates
.writeStream
.outputMode("update")
.foreach(writerForText)
.start()
Append
Режим вывода требуется для любого из FileFormat
раковины, в т.ч. json
, который Spark Structured Streaming проверяет перед началом потокового запроса.
if (outputMode != OutputMode.Append) {
throw new AnalysisException(
s"Data source $className does not support $outputMode output mode")
}
В Spark 2.4 вы могли бы использовать DataStreamWriter.foreach
оператор или новый DataStreamWriter.foreachBatch
оператор, который просто принимает функцию, которая принимает набор данных пакета и идентификатор пакета.
foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]