Hadoop: Когда метод настройки вызывается в редукторе?
Насколько я понимаю, задача сокращения состоит из трех этапов.
Shuffle, Sort и фактическое сокращение вызова.
Поэтому обычно в выводе задания hadoop мы видим что-то вроде: карта 0% уменьшает 0%, карта 20% уменьшает 0% .,, карта на 90% уменьшает на 10% .,,
Поэтому я предполагаю, что задачи сокращения запускаются до того, как все карты завершены, и это поведение контролируется конфигурацией медленного запуска.
Сейчас я еще не понимаю, когда на самом деле вызывается метод настройки редуктора.
В моем случае у меня есть несколько файлов для анализа в методе установки. Файл имеет размер около 60 МБ и извлекается из распределенного кэша. Во время анализа файла существует другой набор данных из конфигурации, который может обновить только что проанализированную запись. После анализа и возможного обновления, файл сохраняется в HashMap для быстрого поиска. Поэтому я хотел бы, чтобы этот метод был вызван как можно скорее, возможно, пока картографы все еще делают свое дело.
Можно ли сделать это? Или это то, что уже происходит?
Спасибо
2 ответа
Setup
вызывается непосредственно перед тем, как сможет прочитать первую пару ключ / значение из потока.
Это эффективно после того, как все мапперы запустились и все объединение для данного раздела редуктора закончено.
Как объясняется в документах Hadoop, setup()
метод вызывается один раз в начале задачи. Он должен использоваться для создания экземпляров ресурсов / переменных или чтения настраиваемых параметров, которые, в свою очередь, могут использоваться в reduce()
метод. Думайте об этом как конструктор.
Вот пример редуктора:
class ExampleReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
private int runId;
private ObjectMapper objectMapper;
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
this.runId = Integer.valueOf(conf.get("stackru_run_id"));
this.objectMapper = new ObjectMapper();
}
@Override
protected void reduce(ImmutableBytesWritable keyFromMap, Iterable<ImmutableBytesWritable> valuesFromMap, Context context) throws IOException, InterruptedException {
// your code
var = objectMapper.writeValueAsString();
// your code
context.write(new ImmutableBytesWritable(somekey.getBytes()), put);
}
}