Hadoop: Когда метод настройки вызывается в редукторе?

Насколько я понимаю, задача сокращения состоит из трех этапов.

Shuffle, Sort и фактическое сокращение вызова.

Поэтому обычно в выводе задания hadoop мы видим что-то вроде: карта 0% уменьшает 0%, карта 20% уменьшает 0% .,, карта на 90% уменьшает на 10% .,,

Поэтому я предполагаю, что задачи сокращения запускаются до того, как все карты завершены, и это поведение контролируется конфигурацией медленного запуска.

Сейчас я еще не понимаю, когда на самом деле вызывается метод настройки редуктора.

В моем случае у меня есть несколько файлов для анализа в методе установки. Файл имеет размер около 60 МБ и извлекается из распределенного кэша. Во время анализа файла существует другой набор данных из конфигурации, который может обновить только что проанализированную запись. После анализа и возможного обновления, файл сохраняется в HashMap для быстрого поиска. Поэтому я хотел бы, чтобы этот метод был вызван как можно скорее, возможно, пока картографы все еще делают свое дело.

Можно ли сделать это? Или это то, что уже происходит?

Спасибо

2 ответа

Решение

Setup вызывается непосредственно перед тем, как сможет прочитать первую пару ключ / значение из потока.

Это эффективно после того, как все мапперы запустились и все объединение для данного раздела редуктора закончено.

Как объясняется в документах Hadoop, setup() метод вызывается один раз в начале задачи. Он должен использоваться для создания экземпляров ресурсов / переменных или чтения настраиваемых параметров, которые, в свою очередь, могут использоваться в reduce() метод. Думайте об этом как конструктор.

Вот пример редуктора:

class ExampleReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {

    private int runId;
    private ObjectMapper objectMapper;

    @Override
    protected void setup(Context context) throws IOException {
        Configuration conf = context.getConfiguration();
        this.runId = Integer.valueOf(conf.get("stackru_run_id"));
        this.objectMapper = new ObjectMapper();
    }


    @Override
    protected void reduce(ImmutableBytesWritable keyFromMap, Iterable<ImmutableBytesWritable> valuesFromMap, Context context) throws IOException, InterruptedException {
        // your code
        var = objectMapper.writeValueAsString();
        // your code
        context.write(new ImmutableBytesWritable(somekey.getBytes()), put);
    }
}
Другие вопросы по тегам