Hadoop порядок операций

В соответствии с приложенным изображением, найденным в уроке по Yahoo, у вас есть следующий порядок действий: карта> объединение> раздел, за которым следует уменьшение

Вот мой пример ключа, испускаемого операцией карты

LongValueSum:geo_US|1311722400|E        1

Предполагая, что есть 100 ключей одного типа, это должно быть объединено как

geo_US|1311722400|E     100

Затем я хотел бы разделить ключи по значению до первого канала (|) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary + сортировать%2C+ кнопки +-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+ опция%29

geo_US

так вот моя потоковая команда

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

Это ошибка, которую я получаю

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

Я похоже, что разделитель работает до объединителя. Какие-нибудь мысли?

2 ответа

Я проверил "Руководство по Hadoop", глава 6 "Перемешать и сортировать". Вывод карты сначала буферизируется в памяти. Когда память превысит свой порог, выходные данные карты будут записаны на диск. Перед записью на диск данные будут разделены. Внутри каждого раздела данные будут отсортированы по ключу. После этого, если есть функция объединения, объедините результаты сортировки.

На диске может быть много файлов разливов, если имеется как минимум 3 файла разливов, объединитель будет запущен снова, прежде чем выходные данные будут записаны на диск.

Наконец, все файлы разлива будут объединены в один файл, чтобы уменьшить количество операций ввода-вывода.

Короче говоря, для картографа: map -> partition -> sort ---> combiner

и для редуктора: скопировать преобразователь формы -> объединить (объединитель вызывается, если существует) -> уменьшить

Нет никакой гарантии, что Combiner будет фактически запущен для версий hadoop> 0.16. В hadoop 17 объединитель не запускается, если один <K,V> занимает весь буфер сортировки. в версиях> 0.18 объединитель может запускаться несколько раз как на карте, так и на этапах сокращения.

По сути, ваши алгоритмы не должны зависеть от того, вызывается ли функция Combine, поскольку она предназначена только для оптимизации. Для получения дополнительной информации ознакомьтесь с книгой Haddop, полное руководство.. нашел фрагмент, который рассказывает о функциях объединения в книгах Google здесь

Другие вопросы по тегам