Как работает HashPartitioner?

Я прочитал на документацию HashPartitioner, К сожалению, ничего не было объяснено, кроме вызовов API. Я предполагаю, что HashPartitioner разбивает распределенный набор на основе хэша ключей. Например, если мои данные похожи

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

Таким образом, разделитель поместил бы это в различные разделы с одинаковыми ключами, попадающими в один и тот же раздел. Однако я не понимаю значение аргумента конструктора

new HashPartitoner(numPartitions) //What does numPartitions do?

Для вышеуказанного набора данных, как будут отличаться результаты, если я сделал

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

Так как же HashPartitioner работать на самом деле?

3 ответа

Решение

Что ж, давайте сделаем ваш набор данных немного интереснее:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

У нас есть шесть элементов:

rdd.count
Long = 6

нет разделителя:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

и восемь разделов:

rdd.partitions.length
Int = 8

Теперь давайте определим маленький помощник для подсчета количества элементов на раздел:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Поскольку у нас нет разделителя, наш набор данных равномерно распределен между разделами ( Схема разделения по умолчанию в Spark):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Начальное-распределение

Теперь давайте переделим наш набор данных:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Поскольку параметр переданHashPartitionerопределяет количество разделов, которые мы ожидаем один раздел:

rddOneP.partitions.length
Int = 1

Поскольку у нас есть только один раздел, он содержит все элементы:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

хэш-разметки-1

Обратите внимание, что порядок значений после тасования является недетерминированным.

Точно так же, если мы используемHashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

мы получим 2 раздела:

rddTwoP.partitions.length
Int = 2

посколькуrddразделен по ключевым данным, больше не будет распределяться равномерно:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Потому что есть три ключа и только два разных значенияhashCodeмодификацияnumPartitionsздесь нет ничего неожиданного:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Просто чтобы подтвердить вышеизложенное:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

хэш-разметки-2

Наконец сHashPartitioner(7)мы получаем семь разделов, три непустых с 2 элементами в каждом:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

хэш-разметки-7

Резюме и примечания

  • HashPartitionerпринимает один аргумент, который определяет количество разделов
  • значения присваиваются разделам с помощью hashключей. hash функция может отличаться в зависимости от языка (Scala RDD может использовать hashCode,DataSetsиспользуйте MurmurHash 3, PySpark,portable_hash).

    В простом случае, как этот, где ключ представляет собой небольшое целое число, вы можете предположить, что hash это личность (i = hash(i)).

    Scala API использует nonNegativeModопределить раздел на основе вычисленного хэша,

  • если распределение ключей не является равномерным, вы можете оказаться в ситуации, когда часть вашего кластера простаивает

  • ключи должны быть хэшируемыми Вы можете проверить мой ответ для списка в качестве ключа для PySpark reduByKey, чтобы прочитать о конкретных проблемах PySpark. Другая возможная проблема выделена документацией HashPartitioner:

    Java-массивы имеют hashCodes, которые основаны на идентификаторах массивов, а не на их содержимом, поэтому попытка разбить RDD [Array []] или RDD [(Array [], _)] с помощью HashPartitioner приведет к неожиданному или неправильному результату.

  • В Python 3 вы должны убедиться, что хеширование согласованно. См. Что означает Исключение: случайность хэша строки должна быть отключена с помощью значения PYTHONHASHSEED в pyspark?

  • Хэш-разделитель не является ни инъективным, ни сюръективным. Несколько ключей могут быть назначены одному разделу, а некоторые разделы могут оставаться пустыми.

  • Обратите внимание, что в настоящее время основанные на хэше методы не работают в Scala в сочетании с заданными в REPL классами случаев ( равенство классов дел в Apache Spark).

  • HashPartitioner (или любой другой Partitioner) перемешивает данные. Если разделение не используется повторно между несколькими операциями, это не уменьшает объем данных, которые необходимо перетасовать.

СДР распространяется, это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на другой машине. Хеш-разделитель с arument numPartitions выбирает на каком разделе разместить пару (key, value) следующим образом:

  1. Создает точно numPartitions перегородки.
  2. места (key, value) в разделе с номером Hash(key) % numPartitions

HashPartitioner.getPartition Метод принимает ключ в качестве аргумента и возвращает индекс раздела, которому принадлежит ключ. Секционер должен знать, каковы действительные индексы, поэтому он возвращает числа в нужном диапазоне. Количество разделов указывается через numPartitions аргумент конструктора.

Реализация возвращает примерно key.hashCode() % numPartitions, Смотрите Partitioner.scala для более подробной информации.

Другие вопросы по тегам