Отладка функции mapreduce() в R
Сегодня я начал работать над пакетами rhdfs и rmr2.
Функция mapreduce() на одномерном векторе работала хорошо, как и ожидалось.кусок кода на одномерном векторе
a1 <- to.dfs(1:20)
a2 <- mapreduce(input=a1, map=function(k,v) keyval(v, v^2))
a3 <- as.data.frame(from.dfs(a2())
Возвращает следующий датафрейм
Key Val
1 1 1
2 10 100
3 11 121
4 12 144
5 13 169
6 14 196
7 15 225
8 16 256
9 17 289
10 18 324
11 19 361
12 2 4
13 20 400
14 3 9
15 4 16
16 5 25
17 6 36
18 7 49
19 8 64
20 9 81
До сих пор это было хорошо.
Но, работая над функцией mapreduce для набора данных mtcars, я получил следующее сообщение об ошибке. Не удалось отладить его дальше. Пожалуйста, дайте некоторую подсказку, чтобы двигаться вперед.
Мой кусок кода:
rs1 <- mapreduce(input=mtcars,
map=function(k, v) {
if (mtcars$hp > 150) keyval("Bigger", 1) },
reduce=function(k, v) keyval(k, sum(v))
)
Сообщение об ошибке с приведенным выше фрагментом кода.
13/09/21 07:24:49 ERROR streaming.StreamJob: Missing required option: input
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/hadoop-streaming.jar [options]
Options:
-input <path> DFS input file(s) for the Map step
-output <path> DFS output directory for the Reduce step
-mapper <cmd|JavaClassName> The streaming command to run
-combiner <cmd|JavaClassName> The streaming command to run
-reducer <cmd|JavaClassName> The streaming command to run
-file <file> File/dir to be shipped in the Job jar file
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
-outputformat TextOutputFormat(default)|JavaClassName Optional.
-partitioner JavaClassName Optional.
-numReduceTasks <num> Optional.
-inputreader <spec> Optional.
-cmdenv <n>=<v> Optional. Pass env.var to streaming commands
-mapdebug <path> Optional. To run this script when a map task fails
-reducedebug <path> Optional. To run this script when a reduce task fails
-io <identifier> Optional.
-verbose
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
For more details about these options:
Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info
Streaming Command Failed!
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, :
hadoop streaming failed with error code 1
Быстрые и подробные ответы высоко ценятся...
2 ответа
Данные, которые вы передаете для Keyval, считают его вектором, а не единым целым. Попробуйте интерпретировать снизу код.
Пробовать локально
Загрузка данных
data(mtcars)
просмотреть несколько строк данных
head(mtcars) hpTest=mtcars$hp # taking required data print(hpTest)
Итоговая сумма
sum(hpTest[which(hpTest>150)]) # 2804
Работает на Hadoop-MapReduce
экспорт переменных env
# requied Sys.setenv(HADOOP_HOME="/home/trendwise/apache/hadoop-1.0.4"); Sys.setenv(HADOOP_CMD="/home/trendwise/apache/hadoop-1.0.4/bin/hadoop"); #optional Sys.setenv(HADOOP_BIN="/home/trendwise/apache/hadoop-1.0.4/bin"); Sys.setenv(HADOOP_CONF_DIR="/home/trendwise/apache/hadoop-1.0.4/conf"); Sys.setenv(HADOOP_STREAMING='/home/trendwise/apache/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar') Sys.setenv(LD_LIBRARY_PATH="/lib:/lib/x86_64-linux-gnu:/lib64:/usr/lib:/usr/lib64:/usr/local/lib:/usr/local/lib64:/usr/lib/jvm/jdk1.7.0_10/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64/server");
Загрузка библиотеки
library(rmr2) library(rhdfs)
инициализация
hdfs.init()
положить вход в HDFS
hpInput = to.dfs(mtcars$hp)
работает MapReduce
mapReduceResult <- mapreduce(input=hpInput, map=function(k, v) { keyval( rep(1,length(which(inputData > 150))) ,v[which(v>150)] )} , reduce=function(k2, v2){ keyval(k2, sum(v2))}
просмотр вывода МР
from.dfs(mapReduceResult)
выход
$key [1] 1 $val [1] 2804
Вы можете использовать встроенные функции отладки в новейшем RStudio. Просто переписать свой код в местной манере