Не понимая NPR MapReduce
Вот ошибка, которую я получаю:
14/02/28 02:52:43 INFO mapred.JobClient: Task Id : attempt_201402271927_0020_m_000001_2, Status : FAILED
java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:843)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Я закомментировал свой код, чтобы он по существу принимал типичные LongWritable и Text, а затем просто выводил константу IntWritable 1 и пустой класс погоды (пользовательский класс):
Вот мой класс картографа:
public class Map extends Mapper<LongWritable, Text, IntWritable, Weather> {
private IntWritable id = new IntWritable(1);
private Weather we = new Weather();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//String s;
//String line = value.toString();
//int start[] = {0,18,31,42,53,64,74,84,88,103};
//int end[] = {6,22,33,44,55,66,76,86,93,108};
//if(line.length() > 108) {
// create the object to hold our data
// getStuff()
// parse the string
// push the object onto our data structure
context.write(id, we);
//}
}
Вот мой редуктор:
public class Reduce extends Reducer<IntWritable, Weather, IntWritable, Text> {
private Text text = new Text("one");
private IntWritable one = new IntWritable(1);
public void reduce(IntWritable key, Iterable<Weather> weather, Context context)
throws IOException, InterruptedException {
//for(Weather w : weather) {
// text.set(w.toString());
context.write(one, text);
}
}
Вот мой главный:
public class Skyline {
public static void main(String[] args) throws IOException{
//String s = args[0].length() > 0 ? args[0] : "skyline.in";
Path input, output;
Configuration conf = new Configuration();
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
try {
input = new Path(args[0]);
} catch(ArrayIndexOutOfBoundsException e) {
input = new Path("hdfs://localhost/user/cloudera/in/skyline.in");
}
try {
output = new Path(args[1]);
//FileSystem.getLocal(conf).delete(output, true);
} catch(ArrayIndexOutOfBoundsException e) {
output = new Path("hdfs://localhost/user/cloudera/out/");
//FileSystem.getLocal(conf).delete(output, true);
}
Job job = new Job(conf, "skyline");
job.setJarByClass(Skyline.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Weather.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
try {
job.waitForCompletion(true);
} catch(InterruptedException e) {
System.out.println("Interrupted Exception");
} catch(ClassNotFoundException e) {
System.out.println("ClassNotFoundException");
}
}
}
Вот образец моего класса погоды:
public class Weather {
private in stationId;
public Weather(){}
public int getStation(){return this.stationID;}
public void setStation(int r){this.stationID = r}
//...24 additional things of ints, doubles and strings
}
Я в своем уме. На данный момент у меня есть оболочка программы, которая ничего не делает и все еще получает ошибку. Я прочитал о Java Generics, чтобы убедиться, что я использую их правильно (я думаю, что я), я очень зеленый к парадигме MapReduce, но эта программа просто оболочка, измененная из учебника MapReduce ( https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html).
1 ответ
Проблема в том, что класс, который вы используете для map()
выход / reduce()
вход, Weather
не реализует Writable
, Это предотвратит дефолт SerializationFactory
от возможности обрабатывать ваши ценности.
Основная концептуальная проблема заключается в том, что Hadoop не знает, как сериализовать ваш тип данных на диск и прочитать его обратно. Это обязательный шаг, потому что данные должны быть сохранены, прежде чем они могут быть перемещены из задачи карты в редуктор (как правило, они могут работать на отдельных узлах).
Итак, что вы хотите сделать, это реализовать Writable
и добавить подпрограммы сериализации в ваш пользовательский тип данных.