Задание Accumulo MapReduce завершается с ошибкой java.io.EOFException с использованием AccumuloRowInputFormat

Все мои картографы терпят неудачу за исключением ниже. Я показал только последний провал для краткости.

Почему это происходит и как мне это исправить?

16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient:   Job Counters
16/09/21 17:02:00 INFO mapred.JobClient:     Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient:     Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient:     Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient:     Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

Я использую таблицу Accumulo в качестве входных данных. Моя установка выглядит следующим образом:

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
    conf.set(TYPE_ID_MAP_KEY, idMapFileContent);

    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    job.setMapperClass(DanglingLinksFinderMapper.class);
    job.setReducerClass(DanglingLinksFinderReducer.class);
    this.setupRowInputFormat(job);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    Path out = new Path(args[0]);
    LOGGER.info("Writing to output directory: " + out.toUri());
    FileOutputFormat.setOutputPath(job, out);

    int exitCode = job.waitForCompletion(true) ? 0 : 1;
}

private Job setupRowInputFormat(Job job)
        throws IOException, AccumuloSecurityException
{
    job.setInputFormatClass(AccumuloRowInputFormat.class);
    Configuration conf = job.getConfiguration();

    AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
    LOGGER.info(connectInfo.toString());

    AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
    AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
    AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
    AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
    return job;
}

Я использую Hadoop 2.6.0, Accumulo 1.5.0 и Java 1.7.

У меня это работало на днях и ничего не изменило. Так что я думаю, может быть, это как-то связано с конфигурацией или состоянием данных на сервере, на котором я его запускаю? Работа отлично работает на тестовой таблице, запущенной в контейнере Docker на моем локальном компьютере, но не выполняется на моем удаленном тестовом сервере.

Я могу войти в accumulo shell и отсканировать таблицу, с которой я работаю. Там все выглядит хорошо. Я также попытался запустить сжатие на тестовом сервере, который работал нормально, но не решил проблему.

1 ответ

Решение

Я предполагаю, что у вас есть несоответствие версий Jar-файлов Accumulo, которые вы используете для запуска задания MapReduce, и тех, которые вы включаете для использования самого задания (Mappers/Reducers) через DistributedCache или параметр CLI libjars.

Поскольку вы не указали диапазоны, AccumuloInputFormat автоматически извлечет все границы планшета для вашей таблицы и создаст такое же количество объектов RangeInputSplit, как у вас есть Tablets в таблице. Это разделение создается в локальной JVM (JVM создается при отправке задания). Эти объекты RangeInputSplit сериализуются и передаются в YARN.

Вы указали ошибку, когда Mapper берет один из этих сериализованных объектов RangeInputSplit и пытается десериализовать его. В некоторой степени это не удается, потому что сериализованных данных недостаточно для десериализации того, что ожидает прочитать версия Accumulo, работающая в Mapper.

Вполне возможно, что это просто ошибка сериализации в вашей версии Accumulo (пожалуйста, поделитесь этим), но я не припоминаю, чтобы слышал о такой ошибке. Я предполагаю, что есть разница в версии Accumulo для локального пути к классам и пути к классам Mapper.

Другие вопросы по тегам