rdd.sortByKey дает неверный результат
Я скопировал sortByKey
тело и переименовано в sortByKey2
, но они дают разные результаты. Почему первый результат здесь неверен? Это было запущено в затмении. Я возобновил затмение и все еще получил неправильный результат.
package test.spark
import org.apache.spark.sql.SparkSession
object RddTests {
var spark = SparkSession.builder().appName("rdd-test").master("local[*]")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]) {
//mapValues
//combineWithKey
//foldByKey
sortByKey
sortByKey2
}
def sortByKey() {
val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3), ("Lucy", 1))
val rdd = sc.parallelize(people)
val sortByKeyRDD = rdd.sortByKey()
println;println("sortByKeyRDD")
sortByKeyRDD.foreach(println)
}
def sortByKey2() {
val people = List(("Mobin", 2), ("Mobin", 1), ("Lucy", 2), ("Amy", 1), ("Lucy", 3), ("Lucy", 1))
val rdd = sc.parallelize(people)
val sortByKeyRDD = rdd.sortByKey()
println;println("sortByKeyRDD2")
sortByKeyRDD.foreach(println)
}
}
Выход:
[Stage 0:> (0 + 0) / 4]
sortByKeyRDD
(Mobin,2)
(Mobin,1)
(Amy,1)
(Lucy,2)
(Lucy,3)
(Lucy,1)
sortByKeyRDD2
(Amy,1)
(Mobin,2)
(Mobin,1)
(Lucy,2)
(Lucy,3)
(Lucy,1)
1 ответ
foreach
не гарантирует, что элементы будут обработаны в каком-либо конкретном порядке. Если вы делаете sortByKeyRDD.collect.foreach(println)
вы увидите результаты по порядку, хотя это предполагает, что ваши данные поместятся в памяти драйвера.
Как отмечено в sortByKey
документация:
Вызов сбора или сохранения в полученном СДР вернет или выведет упорядоченный список записей
[РЕДАКТИРОВАТЬ] Использование toLocalIterator
вместо collect
ограничивает требования к памяти драйвера самым большим отдельным разделом. Спасибо user8371915 за то, что указал на это в комментарии.
Это вызвано количеством используемых разделов.
Как вы используете master("local[*]")
Искра разделит данные для вас. Количество используемых разделов можно проверить с помощью rdd.getNumPartitions()
,
Если данные сортируются при сортировке, то сортировка будет выполняться по элементам в каждом разделе отдельно. Из документации:
Сортируйте СДР по ключу, чтобы каждый раздел содержал отсортированный диапазон элементов.
При печати результаты будут объединены вместе, и в некоторых случаях это приведет к тому, что данные будут не в порядке, как в первом тесте.
Чтобы избежать этой проблемы, искра может быть вынуждена использовать только один раздел для сортировки. И то и другое sortByKey(numPartitions = 1)
или же rdd.coalesce(1).sortByKey()
может быть использован.
Здесь важно понять, как в Spark работают такие методы, как foreach() или sortByKey().
Когда вы пытаетесь отсортировать данные и хотите распечатать результат с помощью foreach (System.out:: println), драйвер распространяет этот метод на каждый раздел (т.е. узел в случае кластера ИЛИ несколько потоков в случае одной машины). Таким образом, каждый раздел выполняет foreach локально. Это означает, что вы не увидите результат, который хотите видеть.
Возможное решение, которое предлагают люди, но не правильное решение в Bigdata,
sortByKeyRDD.coalesce(1).foreach(System.out::println);
или же
sortByKeyRDD.collect().forEach(System.out::println);
Вышеупомянутое решение предназначено только для понимания, я не рекомендую его использовать. Если у вас большие данные, это может вызвать исключение из-за нехватки памяти при попытке собрать все данные в драйвере для печати вывода.