Не понимая NPR MapReduce

Вот ошибка, которую я получаю:

    14/02/28 02:52:43 INFO mapred.JobClient: Task Id : attempt_201402271927_0020_m_000001_2, Status : FAILED
java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:843)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)

Я закомментировал свой код, чтобы он по существу принимал типичные LongWritable и Text, а затем просто выводил константу IntWritable 1 и пустой класс погоды (пользовательский класс):

Вот мой класс картографа:

public class Map extends Mapper<LongWritable, Text, IntWritable, Weather> {

private IntWritable id = new IntWritable(1);
private Weather we = new Weather();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //String s;
    //String line = value.toString();

    //int start[] =   {0,18,31,42,53,64,74,84,88,103};
    //int end[] =     {6,22,33,44,55,66,76,86,93,108};

    //if(line.length() > 108) {
        // create the object to hold our data
        // getStuff()
        // parse the string

        // push the object onto our data structure
        context.write(id, we);
    //}
}

Вот мой редуктор:

public class Reduce extends Reducer<IntWritable, Weather, IntWritable, Text> {
    private Text text = new Text("one");
    private IntWritable one = new IntWritable(1);
    public void reduce(IntWritable key, Iterable<Weather> weather, Context context)
        throws IOException, InterruptedException {
        //for(Weather w : weather) {
        //    text.set(w.toString());
        context.write(one, text);
    }
}

Вот мой главный:

public class Skyline {

    public static void main(String[] args) throws IOException{
        //String s = args[0].length() > 0 ? args[0] : "skyline.in";
        Path input, output;
        Configuration conf = new Configuration();

        conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
                + "org.apache.hadoop.io.serializer.WritableSerialization");
        try {
            input = new Path(args[0]);
        } catch(ArrayIndexOutOfBoundsException e) {
            input = new Path("hdfs://localhost/user/cloudera/in/skyline.in");
        }
        try {
            output = new Path(args[1]);
            //FileSystem.getLocal(conf).delete(output, true);
        } catch(ArrayIndexOutOfBoundsException e) {
            output = new Path("hdfs://localhost/user/cloudera/out/");
            //FileSystem.getLocal(conf).delete(output, true);
        }

        Job job = new Job(conf, "skyline");

        job.setJarByClass(Skyline.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Weather.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        try {
            job.waitForCompletion(true);
        } catch(InterruptedException e) {
            System.out.println("Interrupted Exception");
        } catch(ClassNotFoundException e) {
            System.out.println("ClassNotFoundException");
        }
    }
}

Вот образец моего класса погоды:

public class Weather {

private in stationId;

public Weather(){}

public int getStation(){return this.stationID;}
public void setStation(int r){this.stationID = r}
//...24 additional things of ints, doubles and strings
}

Я в своем уме. На данный момент у меня есть оболочка программы, которая ничего не делает и все еще получает ошибку. Я прочитал о Java Generics, чтобы убедиться, что я использую их правильно (я думаю, что я), я очень зеленый к парадигме MapReduce, но эта программа просто оболочка, измененная из учебника MapReduce ( https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html).

1 ответ

Решение

Проблема в том, что класс, который вы используете для map() выход / reduce() вход, Weather не реализует Writable, Это предотвратит дефолт SerializationFactory от возможности обрабатывать ваши ценности.

Основная концептуальная проблема заключается в том, что Hadoop не знает, как сериализовать ваш тип данных на диск и прочитать его обратно. Это обязательный шаг, потому что данные должны быть сохранены, прежде чем они могут быть перемещены из задачи карты в редуктор (как правило, они могут работать на отдельных узлах).

Итак, что вы хотите сделать, это реализовать Writable и добавить подпрограммы сериализации в ваш пользовательский тип данных.

Другие вопросы по тегам