Почему SequenceFile усекается?

Я учусь Hadoop и эта проблема на некоторое время сбила меня с толку. В основном я пишу SequenceFile на диск, а затем прочитать его обратно. Тем не менее, каждый раз, когда я получаю EOFException при чтении. Более глубокий взгляд показывает, что при записи файла последовательности он усекается преждевременно, и это всегда происходит после записи индекса 962, а размер файла всегда равен 45056 байтов.

Я использую Java 8 и Hadoop 2.5.1 на MacBook Pro. На самом деле, я попробовал тот же код на другом компьютере Linux под Java 7, но происходит то же самое.

Я могу исключить, писатель / читатель не закрыт должным образом. Я попытался использовать старый стиль try/catch с явным writer.close(), как показано в коде, а также использовал новый подход try-with-resource. Оба не работают.

Любая помощь будет высоко оценена.

Ниже приведен код, который я использую:

public class SequenceFileDemo {

private static final String[] DATA = { "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen" };

public static void main(String[] args) throws Exception {
    String uri = "file:///Users/andy/Downloads/puzzling.seq";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);

    Path path = new Path(uri);      
    IntWritable key = new IntWritable();
    Text value = new Text();

    //API change
    try {
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
            stream(fs.create(path)),
            keyClass(IntWritable.class),
            valueClass(Text.class));

        for ( int i = 0; i < 1024; i++ ) {
            key.set( i);
            value.clear();
            value.set(DATA[i % DATA.length]);

            writer.append(key, value);
            if ( (i-1) %100 == 0 ) writer.hflush();
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
        }

        writer.close();

    } catch (Exception e ) {
        e.printStackTrace();
    }


    try {
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
                SequenceFile.Reader.file(path));
        Class<?> keyClass = reader.getKeyClass();
        Class<?> valueClass = reader.getValueClass();

        boolean isWritableSerilization = false;
        try {
            keyClass.asSubclass(WritableComparable.class);
            isWritableSerilization = true;
        } catch (ClassCastException e) {

        }

        if ( isWritableSerilization ) {
            WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
            Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
            while(reader.next(rKey, rValue)) {
                System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
            }
        } else {
            //make sure io.seraizliatons has the serialization in use when write the sequence file
        }

        reader.close();
    } catch(IOException e) {
        e.printStackTrace();
    }
}

}

3 ответа

Решение

Я действительно обнаружил ошибку, потому что вы никогда не закрываете созданный поток в Writer.stream(fs.create(path)),

По некоторым причинам закрытие не распространяется вниз на поток, который вы только что создали. Полагаю, это ошибка, но мне пока лень ее искать в Jira.

Один из способов решить ваши проблемы - просто использовать Writer.file(path) вместо.

Очевидно, что вы также можете просто явно закрыть поток создания. Найдите мой исправленный пример ниже:

    Path path = new Path("file:///tmp/puzzling.seq");

    try (FSDataOutputStream stream = fs.create(path)) {
        try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
                Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {

            for (int i = 0; i < 1024; i++) {
                writer.append(new IntWritable(i), NullWritable.get());
            }
        }
    }

    try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
        Class<?> keyClass = reader.getKeyClass();
        Class<?> valueClass = reader.getValueClass();

        WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
        Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
        while (reader.next(rKey, rValue)) {
            System.out.printf("%s = %s\n", rKey, rValue);
        }

    }

Я думаю, что вам не хватает writer.close() после цикла записи. Это должно гарантировать окончательный сброс, прежде чем вы начнете читать.

Спасибо Томасу.

Это сводится к тому, что созданный писатель "владеет" потоком не. При создании программы записи, если мы передаем опцию Writer.file(путь), программа записи "владеет" базовым потоком, созданным внутри, и закроет его при вызове close(). Тем не менее, если мы передадим Writer.stream (aStream), писатель предположит, что кто-то еще является ответом для этого потока, и не закроет его, когда вызывается close(). Короче говоря, это не ошибка, просто я недостаточно хорошо ее понимаю.,

Другие вопросы по тегам