rdd.sortByKey дает неверный результат

Я скопировал sortByKeyтело и переименовано в sortByKey2, но они дают разные результаты. Почему первый результат здесь неверен? Это было запущено в затмении. Я возобновил затмение и все еще получил неправильный результат.

package test.spark

import org.apache.spark.sql.SparkSession

object RddTests {
  var spark = SparkSession.builder().appName("rdd-test").master("local[*]")
    .enableHiveSupport()
    .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]) {
    //mapValues
    //combineWithKey
    //foldByKey
    sortByKey
    sortByKey2
  }    

  def sortByKey() {
    val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3), ("Lucy", 1))
    val rdd = sc.parallelize(people)
    val sortByKeyRDD = rdd.sortByKey()
    println;println("sortByKeyRDD")
    sortByKeyRDD.foreach(println)
  }

  def sortByKey2() {
    val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3), ("Lucy", 1))
    val rdd = sc.parallelize(people)
    val sortByKeyRDD = rdd.sortByKey()
    println;println("sortByKeyRDD2")
    sortByKeyRDD.foreach(println)
  }
}

Выход:

[Stage 0:>                                                          (0 + 0) / 4]

sortByKeyRDD
(Mobin,2)
(Mobin,1)
(Amy,1)
(Lucy,2)
(Lucy,3)
(Lucy,1)

sortByKeyRDD2
(Amy,1)
(Mobin,2)
(Mobin,1)
(Lucy,2)
(Lucy,3)
(Lucy,1)

1 ответ

foreach не гарантирует, что элементы будут обработаны в каком-либо конкретном порядке. Если вы делаете sortByKeyRDD.collect.foreach(println) вы увидите результаты по порядку, хотя это предполагает, что ваши данные поместятся в памяти драйвера.

Как отмечено в sortByKey документация:

Вызов сбора или сохранения в полученном СДР вернет или выведет упорядоченный список записей

[РЕДАКТИРОВАТЬ] Использование toLocalIterator вместо collect ограничивает требования к памяти драйвера самым большим отдельным разделом. Спасибо user8371915 за то, что указал на это в комментарии.

Это вызвано количеством используемых разделов.

Как вы используете master("local[*]") Искра разделит данные для вас. Количество используемых разделов можно проверить с помощью rdd.getNumPartitions(),

Если данные сортируются при сортировке, то сортировка будет выполняться по элементам в каждом разделе отдельно. Из документации:

Сортируйте СДР по ключу, чтобы каждый раздел содержал отсортированный диапазон элементов.

При печати результаты будут объединены вместе, и в некоторых случаях это приведет к тому, что данные будут не в порядке, как в первом тесте.

Чтобы избежать этой проблемы, искра может быть вынуждена использовать только один раздел для сортировки. И то и другое sortByKey(numPartitions = 1) или же rdd.coalesce(1).sortByKey() может быть использован.

Здесь важно понять, как в Spark работают такие методы, как foreach() или sortByKey().

Когда вы пытаетесь отсортировать данные и хотите распечатать результат с помощью foreach (System.out:: println), драйвер распространяет этот метод на каждый раздел (т.е. узел в случае кластера ИЛИ несколько потоков в случае одной машины). Таким образом, каждый раздел выполняет foreach локально. Это означает, что вы не увидите результат, который хотите видеть.

Возможное решение, которое предлагают люди, но не правильное решение в Bigdata,

       sortByKeyRDD.coalesce(1).foreach(System.out::println);

или же

       sortByKeyRDD.collect().forEach(System.out::println);

Вышеупомянутое решение предназначено только для понимания, я не рекомендую его использовать. Если у вас большие данные, это может вызвать исключение из-за нехватки памяти при попытке собрать все данные в драйвере для печати вывода.

Другие вопросы по тегам