Hadoop несколько выходов с умозрительным исполнением
У меня есть задача, которая записывает вывод avro в несколько каталогов, организованных несколькими полями входных записей.
Например: Обработка записей стран по годам и напишите в структуре каталогов страны / года например: Выходы / США /2015/outputs_usa_2015.avro выходы / ик /2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
Какой обработчик вывода будет использовать приведенный ниже код для записи вывода. Разве это небезопасно для использования с умозрительным исполнением? При спекулятивном исполнении это вызывает (может вызвать) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
В этом посте Hadoop Reducer: как я могу выводить в несколько каталогов, используя умозрительное выполнение? Рекомендуется использовать пользовательский выходной коммиттер
Приведенный ниже код из hadoop AvroMultipleOutputs не содержит никаких проблем со спекулятивным выполнением.
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
Метод записи также не документирует никаких проблем, если путь baseoutput находится вне каталога задания
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
Есть ли реальная проблема с AvroMultipleOutputs (другими выходами) со спекулятивным выполнением при записи вне каталога задания? Если, то как мне переопределить AvroMultipleOutputs, чтобы иметь свой собственный выходной коммиттер. Я не вижу никакого выходного формата внутри AvroMultipleOutputs, чей выходной коммиттер он использует
2 ответа
AvroMultipleOutputs
будет использовать OutputFormat
которые вы зарегистрировали в конфигурациях заданий при добавлении именованных выходных данных, например, используя addNamedOutput
API от AvroMultipleOutputs
(например AvroKeyValueOutputFormat
).
С AvroMultipleOutputs
, вы не сможете использовать функцию выполнения спекулятивных задач. Даже переопределение этого либо не поможет, либо не будет простым.
Вместо этого вы должны написать свой собственный OutputFormat
(наиболее вероятно расширение одного из доступных форматов вывода Avro, например AvroKeyValueOutputFormat
) и переопределить / реализовать его getRecordWriter
API, где он будет возвращать один RecordWriter
например, сказать MainRecordWriter
(только для справки).
это MainRecordWriter
будет поддерживать карту RecordWriter
(например AvroKeyValueRecordWriter
) экземпляры. Каждый из них RecordWriter
экземпляры будут принадлежать одному из выходных файлов. В write
API MainRecordWriter
, вы бы получили фактический RecordWriter
экземпляр с карты (на основе записи, которую вы собираетесь записать), и запишите запись с использованием этого средства записи. Так MainRecordWriter
будет просто работать в качестве оболочки над несколькими RecordWriter
экземпляров.
Для некоторых подобных реализаций вы можете изучить код класса MultiStorage из piggybank
библиотека.
Когда вы добавляете именованный вывод в AvroMultipleOutputs
будет звонить либо AvroKeyOutputFormat.getRecordWriter()
или же AvroKeyValueOutputFormat.getRecordWriter()
, который называют AvroOutputFormatBase.getAvroFileOutputStream()
чье содержание
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}
А также AvroOutputFormatBase
продолжается FileOutputFormat
(getOutputCommitter()
в приведенном выше методе на самом деле вызов FileOutputFormat.getOutputCommitter()
, Следовательно, AvroMultipleOutputs
должны иметь те же ограничения, что и MultipleOutputs
,