В чем разница между union и zipPartitions для СДР Apache Spark?
Я пытаюсь объединиться с RDD, которые уже распределены по нашему кластеру с хэш-разделением на ключе. Мне не нужно сохранять порядок или даже разбиение, я просто хочу, чтобы объединение было максимально быстрым. В этом примере я действительно хочу, чтобы все записи, не только отдельные, но сохраняли кратность.
Вот что я наивно использовал бы:
val newRDD = tempRDD1.union(tempRDD2)
Вот что кто-то рекомендовал мне как более быстрый, поскольку он использует преимущества того, как RDD уже разделены и распределены:
val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)
Что из этого быстрее? И являются ли результаты полностью последовательными, для каждого члена?
Я спрашиваю об этом, потому что до сих пор я думал, что эти методы эквивалентны, но когда я проверял масштаб моих данных и количество разделов, исполнителей, память и т. Д., Я получаю странные результаты для метода zipPartitions, который не после этого корректно работаю с reduByKey.
Возможно, мои различия связаны с самими моими RDD, которые имеют форму ((String, String), (String, Long, Long, Long, Long)), поэтому, возможно, iter++iter2 делает что-то иное, чем объединение этих значений?
Является ли zipPartitions неявным образом чем-то дополнительным, например, сортировкой сравнения или повторным хэшированием, или вообще выполняет слияние иначе, чем union?
Вызовет ли union-vs-zipPartitions разные результаты, если СДР содержат нечеткие строки, или несколько копий ключей, или имеют пустые разделы, или коллизии хеш-ключей, или любые другие подобные проблемы?
Да, я мог бы сам проводить тесты (на самом деле, я делал это в течение последних 2 дней!), Поэтому, пожалуйста, не размещайте ничего глупого, спрашивая меня, пробовал ли я то-то и то-то... задавая этот вопрос, чтобы лучше понять, что происходит на скрытом уровне кода. Был ли "union" записан как подстрока "zipPartitions"?
Позднее редактирование: добавление в некоторых примерах результатов toDebugString, как рекомендовано @Holden
val tempIntermediateRDD6 = tempIntermediateRDD1.
zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated]
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
против:
val tempIntermediateRDD6 = tempIntermediateRDD1.
union(tempIntermediateRDD2).
union(tempIntermediateRDD5).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated]
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated]
// | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
1 ответ
Союз возвращает специализированный UnionRDD
, мы можем увидеть, как это было написано, посмотрев на UnionRDD.scala
в проекте Spark. Глядя на это, мы можем видеть, что Union
фактически реализован с использованием этого блока кода:
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
pos += 1
}
array
}
Если вам интересно, как выглядит базовое вычисление на СДР, я бы порекомендовал использовать toDebugString
функция на результирующем RDD. Затем вы можете увидеть, как выглядит DAG зависимостей.