Скорость работы Spark в зависимости от размера сервера SQL

Моя настройка:

  • У меня есть сервер MS SQL под управлением Azure (V12)
  • У меня есть кластер Spark, работающий в Azure HDInsights
  • Spark.Version: 1.6.2
  • Scala.Version: 2.10.5

Я извлекаю свои данные (около 5 миллионов строк) с сервера SQL через драйвер jdbc, а затем обновляю имена столбцов. Затем я выполняю свою логику.

Вот как я читаю с сервера:

println("SQL Load Start: " + TimeStamp.getCurrentTime.toDateString)
val options: Map[String, String] = Map("url" -> connectionString,
    "dbtable" -> ("(SELECT * FROM Data)" + "Data"))
val data = sQLContext.read.format("jdbc").options(options).load()
println("DF Count: " + data.count().toString)
println("Partition Count: " + data.rdd.partitions.length.toString)
println("SQL Load End: " + TimeStamp.getCurrentTime.toDateString)

Когда я масштабирую SQL-сервер, мои данные быстро читаются, и моя логика запускается примерно за 10 секунд. Когда SQL-сервер небольшой, мои данные считываются более медленными темпами (как и ожидалось), но моя логика также занимает гораздо больше времени (НЕОБХОДИМО), занимая около 3 - 4 минут.

Размер кластера не меняется. Количество разделов в DataFrame не изменяется. Я только изменяю размер сервера SQL.

У кого-нибудь есть идеи о том, почему логическая часть моей работы Spark будет колебаться в зависимости от размера моего SQL-сервера?

1 ответ

Решение

Я понял это. Один из моих товарищей по команде напомнил мне о том, что я узнал на тренировках. Если вы не кэшируете данные после извлечения их из MS SQL, они снова будут извлекать данные, когда вы работаете с ними.

Обновленное может выглядеть следующим образом:

println("SQL Load Start: " + TimeStamp.getCurrentTime.toDateString)
val options: Map[String, String] = Map("url" -> connectionString,
    "dbtable" -> ("(SELECT * FROM Data)" + "Data"))
val data = sQLContext.read.format("jdbc").options(options).load()

data.cache()   // NEW CODE

println("DF Count: " + data.count().toString)
println("Partition Count: " + data.rdd.partitions.length.toString)
println("SQL Load End: " + TimeStamp.getCurrentTime.toDateString)
Другие вопросы по тегам