Эффективность вычислений InputSplit в NLineInputFormat

Я посмотрел в getSplitsForFile() Fn NLineInputFormat. Я обнаружил, что InputStream создается для входного файла, а затем его итерация и разбиения создаются каждые n строк. Это эффективно? Особенно, когда эта операция чтения происходит на 1 узле перед запуском задачи сопоставления. Что делать, если у меня есть 5 ГБ файла. В основном это означает, что данные файла ищутся дважды, один раз во время создания разделения и один раз во время чтения из задач маппера. Если это узкое место, как работа hadoop отменяет это?

 public static List<FileSplit> getSplitsForFile(FileStatus status,
          Configuration conf, int numLinesPerSplit) throws IOException {
        List<FileSplit> splits = new ArrayList<FileSplit> ();
        Path fileName = status.getPath();
        if (status.isDirectory()) {
          throw new IOException("Not a file: " + fileName);
        }
        FileSystem  fs = fileName.getFileSystem(conf);
        LineReader lr = null;
        try {
          FSDataInputStream in  = fs.open(fileName);
          lr = new LineReader(in, conf);
          Text line = new Text();
          int numLines = 0;
          long begin = 0;
          long length = 0;
          int num = -1;
<!-- my part of concern start -->
          while ((num = lr.readLine(line)) > 0) {
            numLines++;
            length += num;
            if (numLines == numLinesPerSplit) {
              splits.add(createFileSplit(fileName, begin, length));
              begin += length;
              length = 0;
              numLines = 0;
            }
          }
<!-- my part of concern end -->
          if (numLines != 0) {
            splits.add(createFileSplit(fileName, begin, length));
          }
        } finally {
          if (lr != null) {
            lr.close();
          }
        }
        return splits; 
      }

Редактирование, чтобы предоставить мой сценарий использования clément-mathieu

Мои наборы данных - это большие входные файлы по 2 Гб каждый. Каждая строка в файлах представляет запись, которая должна быть вставлена ​​в таблицу базы данных (в моем случае cassandra). Я хочу ограничить массовые транзакции для моей базы данных каждой n-строкой. Мне удалось сделать это с помощью nlineinputformat. Единственное, что меня беспокоит, так это наличие скрытого узкого места в производительности, которое может появиться в производстве.

1 ответ

Решение

В основном это означает, что данные файла ищутся дважды, один раз во время создания разделения и один раз во время чтения из задач маппера.

Да.

Цель этого InputFormat это создать разделение для каждой N-строк. Единственный способ вычислить границы разделения - это прочитать этот файл и найти символы новой строки. Эта операция может быть дорогостоящей, но вы не можете избежать ее, если это то, что вам нужно.

Если это узкое место, как работа hadoop отменяет это?

Не уверен, что понял вопрос.

NLineInputFormat не является InputFormat по умолчанию, и очень немногие варианты использования требуют этого. Если вы прочтете javadoc этого класса, то увидите, что этот класс в основном существует для подачи параметров в смущающие параллельные задания (= "маленькие" входные файлы).

Большинству InputFormat не нужно читать файл для вычисления разбиений. Обычно они используют жесткие правила, такие как разделение должно составлять 128 МБ или одно разделение для каждого блока HDFS, а RecordReaders позаботится о реальном смещении начала / конца разделения.

Если стоимость NLineInputFormat.getSplitsForFile это проблема, я бы действительно пересмотреть, почему мне нужно использовать это InputFormat, То, что вы хотите сделать, это ограничить размер пакета бизнес-процесса в вашем устройстве отображения. С NLineInputFormat маппер создается для каждых N строк, это означает, что маппер никогда не выполнит более одной массовой транзакции. Похоже, вам не нужна эта функция, вам нужно только ограничить размер массовой транзакции, но вам все равно, будет ли картограф последовательно выполнять несколько из них. Таким образом, вы платите за стоимость кода, который вы обнаружили, взамен ничего.

я хотел бы использовать TextInputFormat и создайте партию в маппере. В псевдокоде:

setup() {
  buffer = new Buffer<String>(1_000_000);
}

map(LongWritable key, Text value) {
  buffer.append(value.toString())
  if (buffer.isFull()) {
    new Transaction(buffer).doIt()
    buffer.clear()
  }
}

cleanup() {
  new Transaction(buffer).doIt()
  buffer.clear()
}

По умолчанию маппер создается для каждого блока HDFS. Если вы думаете, что это слишком много или мало, mapred.(max|min).split.size переменные позволяют увеличивать или уменьшать параллелизм.

В принципе пока удобно NLineInputFormat слишком мелкозернистый для того, что вам нужно. Вы можете достичь почти того же, используя TextInputFormat и играть с *.split.size который не включает чтение файлов для создания разделений.

Другие вопросы по тегам