Spark тасует большое количество данных

Я написал искровую работу. Который выглядит ниже:

public class TestClass {

public static void main(String[] args){
String masterIp = args[0];
String appName = args[1];
String inputFile = args[2];
String output = args[3];
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> rdd = sparkContext.textFile(inputFile);
Integer[] keyColumns = new Integer[] {0,1,2};
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns);

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1);
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2;
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2;

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
      private static final long serialVersionUID = -6293440291696487370L;
      @Override
      public Tuple2<String, Integer> call(String t) throws Exception {
        String[] record = t.split(",");
        Integer[] keyColumns = broadcastJob.value();
        StringBuilder key = new StringBuilder();
        for (int index = 0; index < keyColumns.length; index++) {
          key.append(record[keyColumns[index]]);
        }
        key.append("|id=1");
        Integer value = new Integer(record[4]);
        return new Tuple2<String, Integer>(key.toString(),value);
      }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2);
      pairRDD.saveAsTextFile(output);
   }
}

Программа рассчитывает сумму значений для каждого ключа. Насколько я понимаю, локальный объединитель должен работать на каждом узле и складывать значения для одних и тех же ключей, а затем происходит перемешивание с небольшим количеством данных. Но на SparkUI он показывает огромное количество случайного чтения и случайного чтения (почти 58 ГБ). Я делаю что-то не так? Как узнать, работает ли местный комбинер?

Детали кластера:-
20 узлов кластера
Каждый узел имеет жесткий диск 80 ГБ, 8 ГБ ОЗУ, 4 ядра
Hadoop-2.7.2
Spark-2.0.2 (дистрибутив prebuild-with-Hadoop-2.7.x)

Детали входного файла:-
входной файл хранится в формате hdfs
размер входного файла: 400 ГБ
количество записей: 16 129999990
столбцы записи: String(2 символа),int,int,String(2 символа),int,int,String(2 символа),String(2 символа),String(2 символа)

Примечание. Максимальное количество различных ключей составляет 1081600.
В журналах искр я вижу задачу, выполняющуюся с уровнем локальности NODE_LOCAL.

введите описание изображения здесь

1 ответ

Давайте разберем эту проблему и посмотрим, что получится. Для упрощения вычислений предположим, что:

  • Общее количество записей 1.6e8
  • Количество уникальных ключей 1е6
  • Размер сплита составляет 128 МБ (это соответствует количеству задач в вашем пользовательском интерфейсе).

С этими значениями данные будут разбиты на ~3200 разделов (3125 в вашем случае). Это дает вам около 51200 записей за разделение. Кроме того, если распределение количества значений на ключ одинаково, то в среднем должно быть ~160 записей на ключ.

Если данные распределены случайным образом (например, они не отсортированы по ключу), можно ожидать, что в среднем количество записей на ключ на раздел будет близко к единице *. Это в основном наихудший сценарий, когда объединение на стороне карты вообще не уменьшает объем данных.

Кроме того, вы должны помнить, что размер плоского файла обычно будет значительно меньше размера сериализованных объектов.

С реальными данными вы обычно можете ожидать некоторый тип порядка, возникающий в процессе сбора данных, поэтому все должно быть лучше, чем мы рассчитывали выше, но суть в том, что, если данные еще не сгруппированы по разделам, объединение на стороне карты может не дать никаких улучшений. совсем.

Вероятно, вы могли бы уменьшить объем перемешанных данных, используя немного больший разделение (256 МБ дало бы вам чуть более 100 КБ на раздел), но это обходится дороже, чем более длительные паузы GC и, возможно, другие проблемы с GC.


* Вы можете смоделировать это, взяв образцы с заменой:

import pandas as pd
import numpy as np

(pd
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)})
    .groupby("x")
    .x.count()
    .mean())

или просто подумайте о проблеме случайного назначения 160 шаров для 3200 ведер.

Другие вопросы по тегам