Потоковая передача Hadoop с помощью Java Mapper/Reducer
Я пытаюсь запустить потоковое задание hadoop с помощью java Mapper/Reducer над некоторыми дампами из Википедии (в сжатом виде bz2). Я пытаюсь использовать WikiHadoop, интерфейс, недавно выпущенный Викимедиа.
WikiReader_Mapper.java
package courseproj.example; // Mapper: emits (token, 1) for every article occurrence. public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> { // Reuse objects to save overhead of object creation. private final static Text KEY = new Text(); private final static IntWritable VALUE = new IntWritable(1); @Override public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException { KEY.set("article count"); collector.collect(KEY, VALUE); } }
WikiReader_Reducer.java
package courseproj.example; //Reducer: sums up all the counts. public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private final static IntWritable SUM = new IntWritable(); public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } SUM.set(sum); collector.collect(key, SUM); } }
Я запускаю команду
hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \ -libjars lib2/wikihadoop-0.2.jar \ -D mapreduce.input.fileinputformat.split.minsize=300000000 \ -D mapreduce.task.timeout=6000000 \ -D org.wikimedia.wikihadoop.previousRevision=false \ -input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \ -output out \ -inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \ -mapper WikiReader_Mapper \ -reducer WikiReader_Reducer
и сообщения об ошибках, которые я получаю,
Error: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)
Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)
Я больше знаком с новым API hadoop против старого. Так как мой код маппера и редуктора находится в двух разных файлах, где я определяю параметры конфигурации JobConf для задания одновременно, следуя структуре команд потоковой передачи hadoop (явно устанавливая класс маппера и редуктора). Есть ли способ, которым я могу обернуть код преобразователя и преобразователя в один класс (который расширяет Configured и реализует Tool, что сделано в новом API) и передать имя класса в командную строку потоковой передачи hadoop по сравнению с установкой сопоставить и уменьшить классы отдельно?
1 ответ
Потоковая передача использует старый API (org.apache.hadoop.mapred
) - все же ваши классы мапперов и редукторов расширяют новые классы API (org.apache.hadoop.mapreduce
).
Попробуйте изменить свой маппер для реализации org.apache.hadoop.mapred.Mapper
и редуктор для реализации org.apache.hadoop.mapred.Reducer
, например:
package courseproj.example;
// Mapper: emits ("article", 1) for every article occurrence.
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {
// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);
@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException, InterruptedException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}