Разбор Hadoop + Джексона: ObjectMapper читает Object, а затем разбивает
Я реализую JSON RecordReader в Hadoop с Джексоном. К настоящему времени я тестирую локально с помощью JUnit + MRUnit. Файлы JSON содержат по одному объекту, который после некоторых заголовков имеет поле, значением которого является массив записей, каждую из которых я хочу понимать как Запись (поэтому мне нужно пропустить эти заголовки).
Я могу сделать это, продвигая FSDataInputStream до точки чтения. В моем локальном тестировании я делаю следующее:
fs = FileSystem.get(new Configuration());
in = fs.open(new Path(filename));
long offset = getOffset(in, "HEADER_START_HERE");
in.seek(offset);
где getOffset - это функция, которая указывает InputStream, где начинается значение поля - что работает нормально, если мы посмотрим на in.getPos()
значение.
Я читаю первую запись:
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readValue (in, JsonNode.class);
Первая запись возвращается нормально. я могу использовать mapper.writeValueAsString(actualObj)
и это прочитало это хорошо, и это было действительно.
Хорошо, пока здесь.
Поэтому я пытаюсь перебрать объекты, выполнив:
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
do {
actualObj = mapper.readValue (in, JsonNode.class);
if( actualObj != null) {
LOG.info("ELEMENT:\n" + mapper.writeValueAsString(actualObj) );
}
} while (actualObj != null) ;
И он читает первый, но потом ломается:
java.lang.NullPointerException: null
at org.apache.hadoop.fs.BufferedFSInputStream.getPos(BufferedFSInputStream.java:54)
at org.apache.hadoop.fs.FSDataInputStream.getPos(FSDataInputStream.java:57)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:243)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:273)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:225)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:193)
at java.io.DataInputStream.read(DataInputStream.java:132)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.ensureLoaded(ByteSourceBootstrapper.java:340)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.detectEncoding(ByteSourceBootstrapper.java:116)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.constructParser(ByteSourceBootstrapper.java:197)
at org.codehaus.jackson.JsonFactory._createJsonParser(JsonFactory.java:503)
at org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:365)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1158)
Почему происходит это исключение?
Это имеет отношение к чтению на местном уровне?
Нужен ли какой-то сброс или что-то при повторном использовании ObjectMapper
или его основной поток?
1 ответ
Мне удалось обойти это. Если это поможет:
Прежде всего, я использую последнюю версию Jackson 1.x. Кажется, однажды JsonParser
создается с InputStream
, он берет это под контроль. Итак, при использовании readValue()
как только он прочитан (внутренне он вызывает _readMapAndClose()
который автоматически закрывает поток. Существует настройка, которую вы можете установить, чтобы сообщить JsonParser
не закрывать основной поток. Вы можете передать его своему JsonFactory
как это, прежде чем создать свой JsonParser
:
JsonFactory f = new MappingJsonFactory();
f.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
Остерегайтесь, вы несете ответственность за закрытие потока (FSDataInputStream в моем случае). Итак, ответы:
- Почему происходит это исключение?
Потому что анализатор управляет потоком и закрывает его после readValue().
- Это имеет отношение к чтению на местном уровне?
нет
- Нужен ли какой-то сброс или что-то еще при повторном использовании ObjectMapper или его основного потока?
Нет. Что нужно знать при использовании Streaming API, смешанного с ObjectMapper-подобными методами, так это то, что иногда mapper/parser может взять под свой контроль основной поток. Обратитесь к Javadoc JsonParser и проверьте документацию по каждому из методов чтения, чтобы удовлетворить ваши потребности.