Разбор Hadoop + Джексона: ObjectMapper читает Object, а затем разбивает

Я реализую JSON RecordReader в Hadoop с Джексоном. К настоящему времени я тестирую локально с помощью JUnit + MRUnit. Файлы JSON содержат по одному объекту, который после некоторых заголовков имеет поле, значением которого является массив записей, каждую из которых я хочу понимать как Запись (поэтому мне нужно пропустить эти заголовки).

Я могу сделать это, продвигая FSDataInputStream до точки чтения. В моем локальном тестировании я делаю следующее:

fs = FileSystem.get(new Configuration());
in = fs.open(new Path(filename));
long offset = getOffset(in, "HEADER_START_HERE");       
in.seek(offset);

где getOffset - это функция, которая указывает InputStream, где начинается значение поля - что работает нормально, если мы посмотрим на in.getPos() значение.

Я читаю первую запись:

ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readValue (in, JsonNode.class);

Первая запись возвращается нормально. я могу использовать mapper.writeValueAsString(actualObj) и это прочитало это хорошо, и это было действительно.

Хорошо, пока здесь.

Поэтому я пытаюсь перебрать объекты, выполнив:

ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
do {
    actualObj = mapper.readValue (in, JsonNode.class);
    if( actualObj != null) {
        LOG.info("ELEMENT:\n" + mapper.writeValueAsString(actualObj) );
    }
} while (actualObj != null) ;

И он читает первый, но потом ломается:

java.lang.NullPointerException: null
    at org.apache.hadoop.fs.BufferedFSInputStream.getPos(BufferedFSInputStream.java:54)
    at org.apache.hadoop.fs.FSDataInputStream.getPos(FSDataInputStream.java:57)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:243)
    at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:273)
    at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:225)
    at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:193)
    at java.io.DataInputStream.read(DataInputStream.java:132)
    at org.codehaus.jackson.impl.ByteSourceBootstrapper.ensureLoaded(ByteSourceBootstrapper.java:340)
    at org.codehaus.jackson.impl.ByteSourceBootstrapper.detectEncoding(ByteSourceBootstrapper.java:116)
    at org.codehaus.jackson.impl.ByteSourceBootstrapper.constructParser(ByteSourceBootstrapper.java:197)
    at org.codehaus.jackson.JsonFactory._createJsonParser(JsonFactory.java:503)
    at org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:365)
    at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1158)

Почему происходит это исключение?

Это имеет отношение к чтению на местном уровне?

Нужен ли какой-то сброс или что-то при повторном использовании ObjectMapper или его основной поток?

1 ответ

Решение

Мне удалось обойти это. Если это поможет:

Прежде всего, я использую последнюю версию Jackson 1.x. Кажется, однажды JsonParser создается с InputStream, он берет это под контроль. Итак, при использовании readValue()как только он прочитан (внутренне он вызывает _readMapAndClose() который автоматически закрывает поток. Существует настройка, которую вы можете установить, чтобы сообщить JsonParser не закрывать основной поток. Вы можете передать его своему JsonFactory как это, прежде чем создать свой JsonParser:

JsonFactory f = new MappingJsonFactory();
f.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);

Остерегайтесь, вы несете ответственность за закрытие потока (FSDataInputStream в моем случае). Итак, ответы:

  • Почему происходит это исключение?

Потому что анализатор управляет потоком и закрывает его после readValue().

  • Это имеет отношение к чтению на местном уровне?

нет

  • Нужен ли какой-то сброс или что-то еще при повторном использовании ObjectMapper или его основного потока?

Нет. Что нужно знать при использовании Streaming API, смешанного с ObjectMapper-подобными методами, так это то, что иногда mapper/parser может взять под свой контроль основной поток. Обратитесь к Javadoc JsonParser и проверьте документацию по каждому из методов чтения, чтобы удовлетворить ваши потребности.

Другие вопросы по тегам