Потоковая передача Hadoop - удаление конечной вкладки с выхода редуктора
У меня есть потоковое задание Hadoop, вывод которого не содержит пары ключ / значение. Вы можете думать об этом как о парах только для значений или пар только для ключей.
Мой потоковый редуктор (скрипт php) выводит записи, разделенные переводами строки. Потоковая передача Hadoop рассматривает это как ключ без значения и вставляет вкладку перед новой строкой. Эта дополнительная вкладка нежелательна.
Как мне это убрать?
Я использую hadoop 1.0.3 с AWS EMR. Я скачал источник hadoop 1.0.3 и нашел этот код в hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java:
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
Поэтому я попытался передать -D stream.reduce.output.field.separator=
как аргумент к работе без удачи. Я тоже пробовал -D mapred.textoutputformat.separator=
а также -D mapreduce.output.textoutputformat.separator=
без удачи
Я искал в Google, конечно, и ничего, что я нашел, не работает. В одном из результатов поиска даже указывалось, что не было аргумента, который можно было бы передать для достижения желаемого результата (хотя версия hadoop в этом случае была действительно очень старой).
Вот мой код (с добавлением разрывов строк для удобства чтения):
hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
-D mapred.output.compress=true -D stream.reduce.output.field.separator=
-input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
-mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
3 ответа
Глядя на источник org.apache.hadoop.mapreduce.lib.output.TextOutputFormat, я вижу 2 вещи:
write(key,value)
Метод записывает разделитель, если ключ или значение не равно NULL- Разделитель всегда устанавливается, используя значение по умолчанию (
\t
), когдаmapred.textoutputformat.separator
возвращает ноль (что я предполагаю, происходит с-D stream.reduce.output.field.separator=
Ваше единственное решение, возможно, написать ваш собственный OutputFormat, который работает вокруг этих двух проблем.
Мое тестирование
В задаче, которую я имел, я хотел переформатировать строку из
id1|val1|val2|val3
id1|val1
в:
id1|val1,val2,val3
id2|val1
У меня был собственный картограф (скрипт Perl) для преобразования строк. И для этой задачи я изначально пытался сделать ввод только для ключа (или только для значения), но получил результаты с помощью вкладки "трейлинг".
Сначала я только что уточнил:
-D stream.map.input.field.separator = '|' -D stream.map.output.field.separator = '|'
Это дало мапперу ключ, значение пары, так как мое отображение все равно нуждалось в ключе. Но этот вывод теперь имеет вкладку после первого поля
Я получил желаемый результат, когда я добавил:
-D mapred.textoutputformat.separator= '|'
Если я не установил это или установил пустым
-D mapred.textoutputformat.separator=
тогда я бы снова получил вкладку после первого поля.
Это имело смысл, как только я посмотрел на источник для TextOutputFormat
Как полезно для других, используя советы выше, я смог сделать реализацию:
CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
с ровно одной строкой встроенной реализации getRecordWriter, измененной на:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");
вместо:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
после компиляции этого в Jar и включения его в мой потоковый вызов hadoop (с помощью инструкций по потоковому вызову hadoop), вызов выглядел следующим образом:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat \
-file yourMapper.py -mapper yourMapper.py \
-file yourReducer.py -reducer yourReducer.py \
-input $yourInputFile \
-output $yourOutputDirectoryOnHDFS
Я также включил банку в папку, из которой мне звонили.
Он отлично работал для моих нужд (и не создавал никаких вкладок в конце строки после редуктора).
обновление: на основе комментария, подразумевающего, что это действительно полезно для других, вот полный источник моего файла CustomOutputFormat.java:
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress) throws IOException {
boolean isCompressed = getCompressOutput(job);
//Channging the default from '\t' to blank
String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
}
К вашему сведению: для вашего контекста использования убедитесь, что это не оказывает отрицательного влияния на управляемые взаимодействия потоковой передачи hadoop (с точки зрения разделения ключ-значение) между вашим преобразователем и редуктором. Чтобы уточнить:
Из моего тестирования - если у вас есть "вкладка" в каждой строке ваших данных (с чем-то на каждой стороне), вы можете оставить встроенные значения по умолчанию такими, как они есть: потоковая передача будет интерпретировать первую вещь перед первой вкладкой как ваш "ключ", и все в этом ряду после него в качестве вашего "значения". Таким образом, он не видит "нулевое значение" и не добавляет вкладку, которая появляется после вашего редуктора. (Вы увидите, что ваши окончательные результаты отсортированы по значению "ключа", который потоковая передача интерпретирует в каждой строке как то, что происходит перед каждой вкладкой.)
И наоборот, если у вас нет вкладок в ваших данных, и вы не переопределяете значения по умолчанию, используя указанные выше приемы, то вы увидите вкладки после завершения прогона, для которого указанное переопределение становится исправлением.
У меня тоже была эта проблема. Я использовал работу с питоном, предназначенную только для карт, которая в основном просто излучала строки данных CSV. Изучив вывод, я отметил \t в конце каждой строки.
foo,bar,baz\t
Я обнаружил, что картограф и поток Python имеют дело с парами ключ-значение. Если вы не используете разделитель по умолчанию, вся строка данных CSV считается "ключом", и структура, для которой требуется ключ и значение, накладывает на \t и пустое значение.
Поскольку мои данные по сути были строкой CSV, я установил разделитель для потокового и отображаемого вывода на запятую. Фреймворк считывает все до первой запятой в качестве ключа и все после первой запятой в качестве значения. Затем, когда он записывал результаты в файл, он записывал значение запятой ключа, которое эффективно создавало вывод, который я получал.
foo,bar,baz
В моем случае, я добавил ниже, чтобы предотвратить добавление фреймворка \t в конец моего вывода CSV...
-D mapred.reduce.tasks=0 \
-D stream.map.output.field.separator=, \
-D mapred.textoutputformat.separator=, \