Множественное writeStream с искровой потоковой передачей

Я работаю с потоковой передачей искры и сталкиваюсь с некоторыми проблемами, пытаясь реализовать несколько потоков писем. Ниже мой код

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)

где writeStreamer определяется следующим образом:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}

Проблема, с которой я сталкиваюсь, заключается в том, что только первая таблица написана с помощью spark writeStream, для всех остальных таблиц ничего не происходит. У вас есть идеи об этом, пожалуйста?

2 ответа

Решение

query.awaitTermination() должно быть сделано после создания последнего потока.

writeStreamer функция может быть изменена, чтобы вернуть StreamingQuery а не awaitTermination в тот момент (так как он блокирует):

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("orc")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}

тогда у вас будет:

val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)

query3.awaitTermination()

Если вы хотите, чтобы писатели работали параллельно, вы можете использовать

sparkSession.streams.awaitAnyTermination()

и удалить query.awaitTermination() из метода writeStreamer

По умолчанию количество одновременных заданий равно 1, что означает, что за один раз будет активным только одно задание.

Вы пытались увеличить количество возможных параллельных заданий в спарке конф?

sparkConf.set("spark.streaming.concurrentJobs","3")

не официальный источник: http://why-not-learn-something.blogspot.com/2016/06/spark-streaming-performance-tuning-on.html