Является ли groupByKey когда-либо предпочтительным по сравнению с ReduByKey
Я всегда использую reduceByKey
когда мне нужно сгруппировать данные в RDD, потому что он выполняет уменьшение на стороне карты перед перетасовкой данных, что часто означает, что меньше данных перетасовывается, и, таким образом, я получаю более высокую производительность. Даже когда функция уменьшения на стороне карты собирает все значения и фактически не уменьшает объем данных, я все равно использую reduceByKey
потому что я предполагаю, что производительность reduceByKey
никогда не будет хуже чем groupByKey
, Однако мне интересно, верно ли это предположение или действительно есть ситуации, когда groupByKey
должно быть предпочтительным
3 ответа
Я полагаю, что есть другие аспекты проблемы, которые игнорируются при Mike Park и eliasah:
- удобочитаемость кода
- ремонтопригодность кода
- размер кодовой базы
Если операция не уменьшает объем данных, она должна быть так или иначе семантически эквивалентна GroupByKey
, Предположим, у нас естьRDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
и мы хотим объединить все строки для данного ключа. С groupByKey
это довольно просто:
rdd.groupByKey.mapValues(_.mkString(""))
Наивное решение с reduceByKey
выглядит так:
rdd.reduceByKey(_ + _)
Оно короткое и, возможно, простое для понимания, но страдает от двух проблем:
- крайне неэффективно, так как создает новый
String
возражать каждый раз * - предполагает, что выполняемая вами операция обходится дешевле, чем на самом деле, особенно если вы анализируете только DAG или строку отладки
Для решения первой проблемы нам нужна изменяемая структура данных:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
Это все еще предполагает что-то еще, что действительно происходит и является довольно многословным, особенно если повторяется несколько раз в вашем сценарии. Конечно, вы можете извлечь анонимные функции
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
но в конце концов это все еще означает дополнительные усилия для понимания этого кода, повышенную сложность и отсутствие реальной добавленной стоимости. Одна вещь, которая меня особенно беспокоит, это явное включение изменяемых структур данных. Даже если Spark обрабатывает почти все сложности, это означает, что у нас больше нет элегантного, ссылочно-прозрачного кода.
Я хочу сказать, что если вы действительно сократите объем данных, reduceByKey
, В противном случае вы затрудняете написание своего кода, его сложнее анализировать и ничего не получить взамен.
Примечание:
Этот ответ сфокусирован на Scala RDD
API. Текущая реализация Python сильно отличается от своего аналога JVM и включает в себя оптимизации, которые обеспечивают значительное преимущество перед наивным reduceByKey
реализация в случае groupBy
подобные операции.
За Dataset
API см. DataFrame / Dataset groupBy поведение / оптимизация.
* См. Производительность Spark для Scala против Python для убедительного примера
reduceByKey
а также groupByKey
оба используют combineByKey
с различной семантикой объединения / слияния.
Они видят ключевое отличие в том, что groupByKey
проходит флаг (mapSideCombine=false
) в случайном порядке. Судя по проблеме SPARK-772, это является подсказкой движку перемешивания, чтобы не запускать объединитель карт, когда размер данных не изменится.
Так что я бы сказал, что если вы пытаетесь использовать reduceByKey
копировать groupByKey
Вы можете увидеть небольшое снижение производительности.
Я не буду изобретать колесо, согласно документации кода, groupByKey
Операция группирует значения для каждого ключа в СДР в одну последовательность, которая также позволяет управлять разделением результирующей пары ключ-значение СДП путем передачи Partitioner
,
Эта операция может быть очень дорогой. Если вы группируете для выполнения агрегации (например, суммы или среднего) по каждому ключу, используя aggregateByKey
или же reduceByKey
обеспечит гораздо лучшую производительность.
Примечание: как в настоящее время реализовано, groupByKey
должен иметь возможность хранить все пары ключ-значение для любого ключа в памяти. Если ключ имеет слишком много значений, это может привести к OOME.
На самом деле, я предпочитаю combineByKey
операции, но иногда трудно понять концепцию объединителя и слияния, если вы не очень хорошо знакомы с парадигмой сокращения карты. Для этого вы можете прочитать Библию Yahoo Map-Reduction здесь, которая хорошо объясняет эту тему.
Для получения дополнительной информации я советую вам прочитать код PairRDDFunctions.