Потоковая передача Hadoop - удаление конечной вкладки с выхода редуктора

У меня есть потоковое задание Hadoop, вывод которого не содержит пары ключ / значение. Вы можете думать об этом как о парах только для значений или пар только для ключей.

Мой потоковый редуктор (скрипт php) выводит записи, разделенные переводами строки. Потоковая передача Hadoop рассматривает это как ключ без значения и вставляет вкладку перед новой строкой. Эта дополнительная вкладка нежелательна.

Как мне это убрать?

Я использую hadoop 1.0.3 с AWS EMR. Я скачал источник hadoop 1.0.3 и нашел этот код в hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java:

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

Поэтому я попытался передать -D stream.reduce.output.field.separator= как аргумент к работе без удачи. Я тоже пробовал -D mapred.textoutputformat.separator= а также -D mapreduce.output.textoutputformat.separator= без удачи

Я искал в Google, конечно, и ничего, что я нашел, не работает. В одном из результатов поиска даже указывалось, что не было аргумента, который можно было бы передать для достижения желаемого результата (хотя версия hadoop в этом случае была действительно очень старой).

Вот мой код (с добавлением разрывов строк для удобства чтения):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'

3 ответа

Решение

Глядя на источник org.apache.hadoop.mapreduce.lib.output.TextOutputFormat, я вижу 2 вещи:

  1. write(key,value) Метод записывает разделитель, если ключ или значение не равно NULL
  2. Разделитель всегда устанавливается, используя значение по умолчанию (\t), когда mapred.textoutputformat.separator возвращает ноль (что я предполагаю, происходит с -D stream.reduce.output.field.separator=

Ваше единственное решение, возможно, написать ваш собственный OutputFormat, который работает вокруг этих двух проблем.

Мое тестирование

В задаче, которую я имел, я хотел переформатировать строку из

id1|val1|val2|val3
id1|val1

в:

id1|val1,val2,val3
id2|val1

У меня был собственный картограф (скрипт Perl) для преобразования строк. И для этой задачи я изначально пытался сделать ввод только для ключа (или только для значения), но получил результаты с помощью вкладки "трейлинг".

Сначала я только что уточнил:

-D stream.map.input.field.separator = '|' -D stream.map.output.field.separator = '|'

Это дало мапперу ключ, значение пары, так как мое отображение все равно нуждалось в ключе. Но этот вывод теперь имеет вкладку после первого поля

Я получил желаемый результат, когда я добавил:

-D mapred.textoutputformat.separator= '|'

Если я не установил это или установил пустым

-D mapred.textoutputformat.separator=

тогда я бы снова получил вкладку после первого поля.

Это имело смысл, как только я посмотрел на источник для TextOutputFormat

Как полезно для других, используя советы выше, я смог сделать реализацию:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

с ровно одной строкой встроенной реализации getRecordWriter, измененной на:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 

вместо:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 

после компиляции этого в Jar и включения его в мой потоковый вызов hadoop (с помощью инструкций по потоковому вызову hadoop), вызов выглядел следующим образом:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

Я также включил банку в папку, из которой мне звонили.

Он отлично работал для моих нужд (и не создавал никаких вкладок в конце строки после редуктора).


обновление: на основе комментария, подразумевающего, что это действительно полезно для других, вот полный источник моего файла CustomOutputFormat.java:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

К вашему сведению: для вашего контекста использования убедитесь, что это не оказывает отрицательного влияния на управляемые взаимодействия потоковой передачи hadoop (с точки зрения разделения ключ-значение) между вашим преобразователем и редуктором. Чтобы уточнить:

  • Из моего тестирования - если у вас есть "вкладка" в каждой строке ваших данных (с чем-то на каждой стороне), вы можете оставить встроенные значения по умолчанию такими, как они есть: потоковая передача будет интерпретировать первую вещь перед первой вкладкой как ваш "ключ", и все в этом ряду после него в качестве вашего "значения". Таким образом, он не видит "нулевое значение" и не добавляет вкладку, которая появляется после вашего редуктора. (Вы увидите, что ваши окончательные результаты отсортированы по значению "ключа", который потоковая передача интерпретирует в каждой строке как то, что происходит перед каждой вкладкой.)

  • И наоборот, если у вас нет вкладок в ваших данных, и вы не переопределяете значения по умолчанию, используя указанные выше приемы, то вы увидите вкладки после завершения прогона, для которого указанное переопределение становится исправлением.

У меня тоже была эта проблема. Я использовал работу с питоном, предназначенную только для карт, которая в основном просто излучала строки данных CSV. Изучив вывод, я отметил \t в конце каждой строки.

 foo,bar,baz\t

Я обнаружил, что картограф и поток Python имеют дело с парами ключ-значение. Если вы не используете разделитель по умолчанию, вся строка данных CSV считается "ключом", и структура, для которой требуется ключ и значение, накладывает на \t и пустое значение.

Поскольку мои данные по сути были строкой CSV, я установил разделитель для потокового и отображаемого вывода на запятую. Фреймворк считывает все до первой запятой в качестве ключа и все после первой запятой в качестве значения. Затем, когда он записывал результаты в файл, он записывал значение запятой ключа, которое эффективно создавало вывод, который я получал.

 foo,bar,baz

В моем случае, я добавил ниже, чтобы предотвратить добавление фреймворка \t в конец моего вывода CSV...

-D mapred.reduce.tasks=0 \
-D stream.map.output.field.separator=, \
-D mapred.textoutputformat.separator=, \
Другие вопросы по тегам