В чем разница между union и zipPartitions для СДР Apache Spark?

Я пытаюсь объединиться с RDD, которые уже распределены по нашему кластеру с хэш-разделением на ключе. Мне не нужно сохранять порядок или даже разбиение, я просто хочу, чтобы объединение было максимально быстрым. В этом примере я действительно хочу, чтобы все записи, не только отдельные, но сохраняли кратность.

Вот что я наивно использовал бы:

val newRDD = tempRDD1.union(tempRDD2)

Вот что кто-то рекомендовал мне как более быстрый, поскольку он использует преимущества того, как RDD уже разделены и распределены:

val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)

Что из этого быстрее? И являются ли результаты полностью последовательными, для каждого члена?

Я спрашиваю об этом, потому что до сих пор я думал, что эти методы эквивалентны, но когда я проверял масштаб моих данных и количество разделов, исполнителей, память и т. Д., Я получаю странные результаты для метода zipPartitions, который не после этого корректно работаю с reduByKey.

Возможно, мои различия связаны с самими моими RDD, которые имеют форму ((String, String), (String, Long, Long, Long, Long)), поэтому, возможно, iter++iter2 делает что-то иное, чем объединение этих значений?

Является ли zipPartitions неявным образом чем-то дополнительным, например, сортировкой сравнения или повторным хэшированием, или вообще выполняет слияние иначе, чем union?

Вызовет ли union-vs-zipPartitions разные результаты, если СДР содержат нечеткие строки, или несколько копий ключей, или имеют пустые разделы, или коллизии хеш-ключей, или любые другие подобные проблемы?

Да, я мог бы сам проводить тесты (на самом деле, я делал это в течение последних 2 дней!), Поэтому, пожалуйста, не размещайте ничего глупого, спрашивая меня, пробовал ли я то-то и то-то... задавая этот вопрос, чтобы лучше понять, что происходит на скрытом уровне кода. Был ли "union" записан как подстрока "zipPartitions"?

Позднее редактирование: добавление в некоторых примерах результатов toDebugString, как рекомендовано @Holden

val tempIntermediateRDD6 = tempIntermediateRDD1.
  zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
  zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
  partitionBy(partitioner).
  setName("tempIntermediateRDD6").
  persist(StorageLevel.MEMORY_AND_DISK_SER)

tempIntermediateRDD6.checkpoint

println(tempIntermediateRDD6.toDebugString)

// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated]
//   |    ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated]
//   |    tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
//   |        CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//   |    CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
//   |    tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
//   |    CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
//   |    tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
//   |        CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//   |    CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]

против:

val tempIntermediateRDD6 = tempIntermediateRDD1.
  union(tempIntermediateRDD2).
  union(tempIntermediateRDD5).
  partitionBy(partitioner).
  setName("tempIntermediateRDD6").
  persist(StorageLevel.MEMORY_AND_DISK_SER)

tempIntermediateRDD6.checkpoint

println(tempIntermediateRDD6.toDebugString)

// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated]
//   +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated]
//       |    PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated]
//       |    tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
//       |        CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//       |    CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
//       |    tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
//       |    CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
//       |    tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
//       |        CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//       |    CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]

1 ответ

Союз возвращает специализированный UnionRDD, мы можем увидеть, как это было написано, посмотрев на UnionRDD.scala в проекте Spark. Глядя на это, мы можем видеть, что Union фактически реализован с использованием этого блока кода:

  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](rdds.map(_.partitions.length).sum)
    var pos = 0
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
      array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
      pos += 1
    }
    array
  }

Если вам интересно, как выглядит базовое вычисление на СДР, я бы порекомендовал использовать toDebugString функция на результирующем RDD. Затем вы можете увидеть, как выглядит DAG зависимостей.

Другие вопросы по тегам