Hadoop несколько выходов с умозрительным исполнением

У меня есть задача, которая записывает вывод avro в несколько каталогов, организованных несколькими полями входных записей.

Например: 
Обработка записей стран по годам 
и напишите в структуре каталогов страны / года 
например:
Выходы / США /2015/outputs_usa_2015.avro 
выходы / ик /2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

Какой обработчик вывода будет использовать приведенный ниже код для записи вывода. Разве это небезопасно для использования с умозрительным исполнением? При спекулятивном исполнении это вызывает (может вызвать) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

В этом посте Hadoop Reducer: как я могу выводить в несколько каталогов, используя умозрительное выполнение? Рекомендуется использовать пользовательский выходной коммиттер

Приведенный ниже код из hadoop AvroMultipleOutputs не содержит никаких проблем со спекулятивным выполнением.

 private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

    writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

Метод записи также не документирует никаких проблем, если путь baseoutput находится вне каталога задания

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

Есть ли реальная проблема с AvroMultipleOutputs (другими выходами) со спекулятивным выполнением при записи вне каталога задания? Если, то как мне переопределить AvroMultipleOutputs, чтобы иметь свой собственный выходной коммиттер. Я не вижу никакого выходного формата внутри AvroMultipleOutputs, чей выходной коммиттер он использует

2 ответа

AvroMultipleOutputs будет использовать OutputFormat которые вы зарегистрировали в конфигурациях заданий при добавлении именованных выходных данных, например, используя addNamedOutput API от AvroMultipleOutputs (например AvroKeyValueOutputFormat).

С AvroMultipleOutputs, вы не сможете использовать функцию выполнения спекулятивных задач. Даже переопределение этого либо не поможет, либо не будет простым.

Вместо этого вы должны написать свой собственный OutputFormat (наиболее вероятно расширение одного из доступных форматов вывода Avro, например AvroKeyValueOutputFormat) и переопределить / реализовать его getRecordWriter API, где он будет возвращать один RecordWriter например, сказать MainRecordWriter (только для справки).

это MainRecordWriterбудет поддерживать карту RecordWriter (например AvroKeyValueRecordWriter) экземпляры. Каждый из них RecordWriter экземпляры будут принадлежать одному из выходных файлов. В write API MainRecordWriter, вы бы получили фактический RecordWriter экземпляр с карты (на основе записи, которую вы собираетесь записать), и запишите запись с использованием этого средства записи. Так MainRecordWriter будет просто работать в качестве оболочки над несколькими RecordWriter экземпляров.

Для некоторых подобных реализаций вы можете изучить код класса MultiStorage из piggybank библиотека.

Когда вы добавляете именованный вывод в AvroMultipleOutputsбудет звонить либо AvroKeyOutputFormat.getRecordWriter() или же AvroKeyValueOutputFormat.getRecordWriter(), который называют AvroOutputFormatBase.getAvroFileOutputStream()чье содержание

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
  Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
  return path.getFileSystem(context.getConfiguration()).create(path);
}

А также AvroOutputFormatBase продолжается FileOutputFormat (getOutputCommitter() в приведенном выше методе на самом деле вызов FileOutputFormat.getOutputCommitter(), Следовательно, AvroMultipleOutputs должны иметь те же ограничения, что и MultipleOutputs,

Другие вопросы по тегам