Множественное writeStream с искровой потоковой передачей
Я работаю с потоковой передачей искры и сталкиваюсь с некоторыми проблемами, пытаясь реализовать несколько потоков писем. Ниже мой код
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
где writeStreamer определяется следующим образом:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
Проблема, с которой я сталкиваюсь, заключается в том, что только первая таблица написана с помощью spark writeStream, для всех остальных таблиц ничего не происходит. У вас есть идеи об этом, пожалуйста?
2 ответа
query.awaitTermination()
должно быть сделано после создания последнего потока.
writeStreamer
функция может быть изменена, чтобы вернуть StreamingQuery
а не awaitTermination в тот момент (так как он блокирует):
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
}
тогда у вас будет:
val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)
query3.awaitTermination()
Если вы хотите, чтобы писатели работали параллельно, вы можете использовать
sparkSession.streams.awaitAnyTermination()
и удалить query.awaitTermination()
из метода writeStreamer
По умолчанию количество одновременных заданий равно 1, что означает, что за один раз будет активным только одно задание.
Вы пытались увеличить количество возможных параллельных заданий в спарке конф?
sparkConf.set("spark.streaming.concurrentJobs","3")
не официальный источник: http://why-not-learn-something.blogspot.com/2016/06/spark-streaming-performance-tuning-on.html