Hive - эффективное объединение двух таблиц

Я объединяю две большие таблицы в Hive (более 1 миллиарда строк, около 100 миллионов строк) примерно так:

create table joinedTable as select t1.id, ... from t1 join t2 ON (t1.id = t2.id);

Я разбил две таблицы одинаково, сгруппировав по идентификатору по 100 сегментов для каждой, но запрос все еще занимает много времени.

Любые предложения о том, как ускорить это?

3 ответа

Решение

Когда вы объединяете данные с помощью ключей объединения, вы можете использовать Bucket Map Join. Для этого количество сегментов в одной таблице должно быть кратным количеству сегментов в другой таблице. Это можно активировать, выполнив set hive.optimize.bucketmapjoin=true; перед запросом. Если таблицы не соответствуют условиям, Hive просто выполнит обычное Inner Join.

Если в обеих таблицах одинаковое количество сегментов и данные отсортированы по ключам сегментов, Hive может выполнить более быстрое объединение сортировки-слияния. Для его активации необходимо выполнить следующие команды:

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;

Вы можете найти некоторые визуализации различных методов соединения в https://cwiki.apache.org/confluence/download/attachments/27362054/Hive+Summit+2011-join.pdf.

На мой взгляд, ответ немного сложнее, чем предлагал @Adrian Lange.

Прежде всего вы должны понять очень важное различие между BucketJoin и Sort-Merge Bucket Join (SMBJ):

Для выполнения объединения "количество сегментов в одной таблице должно быть кратно количеству сегментов в другой таблице", как указано выше, и дополнительно hive.optimize.bucketmapjoin должен быть установлен в true.
Выпустив соединение, куст превратит его в объединение, если вышеуказанное условие выполнено, НО обратите внимание, что куст не будет приводить к объединению! это означает, что создание таблицы с разбивкой не достаточно для того, чтобы таблица фактически была объединена в указанное количество сегментов, поскольку куст не реализует это, если только hive.enforce.bucketing имеет значение true (что означает, что количество сегментов фактически определяется количеством редукторов на заключительном этапе запроса на вставку данных в таблицу).
Что касается производительности, обратите внимание, что при использовании объединенного задания одна задача считывает "меньшую" таблицу в распределенный кеш перед тем, как маперы получат к ней доступ и выполнит объединение. Этот этап, вероятно, будет очень очень длинным и неэффективным, если ваша таблица имеет ~100м рядов!
После соединения соединение будет таким же, как и в обычном соединении в редукторах.

Для выполнения SMBJ обе таблицы должны иметь одинаковое количество сегментов, в одних и тех же столбцах и сортироваться по этим столбцам в дополнение к настройке. hive.optimize.bucketmapjoin.sortedmerge к истине.
Как и в предыдущей оптимизации, Hive не применяет группирование и сортировку, а скорее предполагает, что вы убедились, что таблицы на самом деле объединены в группы и отсортированы (не только по определению, но и путем установки hive.enforce.sorting или вручную сортировать данные во время их вставки) - это очень важно, поскольку в обоих случаях это может привести к неверным результатам.
Что касается производительности, эта оптимизация более эффективна по следующим причинам:

  1. Каждый маппер читает оба сегмента, и нет единой конкуренции за загрузку распределенного кэша
  2. Выполняемое объединение является соединением с сортировкой слиянием, поскольку данные уже отсортированы, что является гораздо более эффективным.

Пожалуйста, обратите внимание на следующие соображения:

  • в обоих случаях set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
    должно быть выполнено
  • в обоих случаях /*+ MAPJOIN(b) */ следует применять в запросе (сразу после select и где b это меньший столик)
  • Сколько ведер?
    Это следует рассматривать с этой точки зрения: рассмотрение должно применяться строго к большему столу, поскольку оно оказывает большее влияние в этом направлении, и в последнем случае конфигурация будет применяться к меньшему столу как необходимость. Я думаю, что, как правило, каждое ведро должно содержать от 1 до 3 блоков, вероятно, где-то около 2 блоков. поэтому, если размер вашего блока составляет 256 МБ, мне кажется разумным иметь ~512 МБ данных в каждом сегменте таблицы большего размера, так что это становится простым делением.

Кроме того, не забывайте, что одни только эти оптимизации не всегда гарантируют более быстрое время запроса.
Допустим, вы решили сделать SMBJ, это добавляет стоимость сортировки 2 таблиц перед выполнением объединения - так что чем больше вы выполняете запрос, тем меньше вы "платите" за этот этап сортировки.

Иногда простое объединение приводит к лучшей производительности, и ни одна из вышеперечисленных оптимизаций не поможет, и вам придется оптимизировать обычный процесс объединения либо на прикладном / логическом уровне, либо путем настройки параметров MapReduce / Hive, таких как использование памяти / параллелизм и т. Д.

Я не думаю, что это обязательный критерий: "количество сегментов в одной таблице должно быть кратным количеству сегментов в другой таблице" для объединения сегментов карты. Мы также можем иметь такое же количество сегментов.

Другие вопросы по тегам