Ошибка Hadoop при копировании входного файла bz2 из s3

У меня есть задание hadoop только для карт, работающее на Amazon EMR, работающее на последней ami-версии: 3.0.4. Время от времени я получаю исключения вроде этого:

Error: com.amazonaws.AmazonClientException: Unable to verify integrity of data download.  Client calculated content length didn't match content length received from Amazon S3.  The
data may be corrupt.
    at com.amazonaws.util.ContentLengthValidationInputStream.validate(ContentLengthValidationInputStream.java:144)
    at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:81)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.read(EmrFileSystem.java:289)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:866)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:423)
    at org.apache.hadoop.io.compress.BZip2Codec.read(BZip2Codec.java:483)
    at java.io.InputStream.read(InputStream.java:101)

    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:164)
    at org.apache.hadoop.mapred.MapTask.nextKeyValue(MapTask.java:544)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

Есть ли способ вылечить это? Почему это происходит? Это проблема сети в Амазонке? Это не может быть проблемой с входным файлом, поскольку повторное выполнение одного и того же задания обычно завершается успешно. Есть ли способ поймать это исключение? Почему hadoop не вылечивает это автоматически?

Мой основной класс выглядит так:

public class LogParserMapReduce extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(LogParserMapReduce.class);

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = super.getConf();

    conf.setBoolean("mapred.compress.map.output", true);
    conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setBoolean("keep.failed.task.files", true);

    /*
     * Instantiate a Job object for your job's configuration.  
     */
    Job job = Job.getInstance(conf);

    /*
     * The expected command-line arguments are the paths containing
     * input and output data. Terminate the job if the number of
     * command-line arguments is not exactly 2.
     */
    if (args.length != 2) {
      System.out.printf("Usage: LogParserMapReduce <input dir> <output dir>\n");
      System.exit(-1);
    }

    /*
     * Specify the jar file that contains your driver, mapper, and reducer.
     * Hadoop will transfer this jar file to nodes in your cluster running
     * mapper and reducer tasks.
     */
    job.setJarByClass(LogParserMapReduce.class);

    /*
     * Specify an easily-decipherable name for the job.
     * This job name will appear in reports and logs.
     */
    job.setJobName("LogParser");

    /*
     * Specify the paths to the input and output data based on the
     * command-line arguments.
     */
    FileInputFormat.addInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

    /*
     * Specify the mapper and reducer classes.
     */
    job.setMapperClass(LogParserMapper.class);

    /*
     * For the SysLogEvent count application, the input file and output 
     * files are in text format - the default format.
     * 
     * In text format files, each record is a line delineated by a 
     * by a line terminator.
     * 
     * When you use other input formats, you must call the 
     * SetInputFormatClass method. When you use other 
     * output formats, you must call the setOutputFormatClass method.
     */

    /*
     * For the logs count application, the mapper's output keys and
     * values have the same data types as the reducer's output keys 
     * and values: Text and IntWritable.
     * 
     * When they are not the same data types, you must call the 
     * setMapOutputKeyClass and setMapOutputValueClass 
     * methods.
     */

    /*
     * Specify the job's output key and value classes.
     */
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(0);

    LOG.info("LogParserMapReduce: waitingForCompletion");
    /*
     * Start the MapReduce job and wait for it to finish.
     * If it finishes successfully, return 0. If not, return 1.
     */
    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }

}

1 ответ

Решение

Решение было очень простым (после того, как служба поддержки Amazon сказала мне): мне пришлось обновиться до последней версии AMI (в настоящее время это 3.1.0), которая имеет последнюю версию Hadoop (2.4), а также убедиться, что я использовал ту же версию hadoop для компиляция кода Java. С тех пор я не вижу такой проблемы.

Другие вопросы по тегам