Является ли groupByKey когда-либо предпочтительным по сравнению с ReduByKey

Я всегда использую reduceByKey когда мне нужно сгруппировать данные в RDD, потому что он выполняет уменьшение на стороне карты перед перетасовкой данных, что часто означает, что меньше данных перетасовывается, и, таким образом, я получаю более высокую производительность. Даже когда функция уменьшения на стороне карты собирает все значения и фактически не уменьшает объем данных, я все равно использую reduceByKeyпотому что я предполагаю, что производительность reduceByKey никогда не будет хуже чем groupByKey, Однако мне интересно, верно ли это предположение или действительно есть ситуации, когда groupByKey должно быть предпочтительным

3 ответа

Решение

Я полагаю, что есть другие аспекты проблемы, которые игнорируются при Mike Park и eliasah:

  • удобочитаемость кода
  • ремонтопригодность кода
  • размер кодовой базы

Если операция не уменьшает объем данных, она должна быть так или иначе семантически эквивалентна GroupByKey, Предположим, у нас естьRDD[(Int,String)]:

import scala.util.Random
Random.setSeed(1)

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))

и мы хотим объединить все строки для данного ключа. С groupByKey это довольно просто:

rdd.groupByKey.mapValues(_.mkString(""))

Наивное решение с reduceByKey выглядит так:

rdd.reduceByKey(_ + _)

Оно короткое и, возможно, простое для понимания, но страдает от двух проблем:

  • крайне неэффективно, так как создает новый String возражать каждый раз *
  • предполагает, что выполняемая вами операция обходится дешевле, чем на самом деле, особенно если вы анализируете только DAG или строку отладки

Для решения первой проблемы нам нужна изменяемая структура данных:

import scala.collection.mutable.StringBuilder

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s),
    (sb: StringBuilder, s: String) => sb ++= s,
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)

Это все еще предполагает что-то еще, что действительно происходит и является довольно многословным, особенно если повторяется несколько раз в вашем сценарии. Конечно, вы можете извлечь анонимные функции

val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) => 
  sb1.append(sb2)

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)

но в конце концов это все еще означает дополнительные усилия для понимания этого кода, повышенную сложность и отсутствие реальной добавленной стоимости. Одна вещь, которая меня особенно беспокоит, это явное включение изменяемых структур данных. Даже если Spark обрабатывает почти все сложности, это означает, что у нас больше нет элегантного, ссылочно-прозрачного кода.

Я хочу сказать, что если вы действительно сократите объем данных, reduceByKey, В противном случае вы затрудняете написание своего кода, его сложнее анализировать и ничего не получить взамен.

Примечание:

Этот ответ сфокусирован на Scala RDD API. Текущая реализация Python сильно отличается от своего аналога JVM и включает в себя оптимизации, которые обеспечивают значительное преимущество перед наивным reduceByKey реализация в случае groupByподобные операции.

За Dataset API см. DataFrame / Dataset groupBy поведение / оптимизация.


* См. Производительность Spark для Scala против Python для убедительного примера

reduceByKey а также groupByKey оба используют combineByKey с различной семантикой объединения / слияния.

Они видят ключевое отличие в том, что groupByKey проходит флаг (mapSideCombine=false) в случайном порядке. Судя по проблеме SPARK-772, это является подсказкой движку перемешивания, чтобы не запускать объединитель карт, когда размер данных не изменится.

Так что я бы сказал, что если вы пытаетесь использовать reduceByKey копировать groupByKeyВы можете увидеть небольшое снижение производительности.

Я не буду изобретать колесо, согласно документации кода, groupByKey Операция группирует значения для каждого ключа в СДР в одну последовательность, которая также позволяет управлять разделением результирующей пары ключ-значение СДП путем передачи Partitioner,

Эта операция может быть очень дорогой. Если вы группируете для выполнения агрегации (например, суммы или среднего) по каждому ключу, используя aggregateByKey или же reduceByKey обеспечит гораздо лучшую производительность.

Примечание: как в настоящее время реализовано, groupByKey должен иметь возможность хранить все пары ключ-значение для любого ключа в памяти. Если ключ имеет слишком много значений, это может привести к OOME.

На самом деле, я предпочитаю combineByKey операции, но иногда трудно понять концепцию объединителя и слияния, если вы не очень хорошо знакомы с парадигмой сокращения карты. Для этого вы можете прочитать Библию Yahoo Map-Reduction здесь, которая хорошо объясняет эту тему.

Для получения дополнительной информации я советую вам прочитать код PairRDDFunctions.

Другие вопросы по тегам