Сцепление рабочих мест mapreduce

Я наткнулся на "цепочку рабочих мест Mapreduce". Будучи новичком в mapreduce, при каких обстоятельствах мы должны связывать (я предполагаю, что создание цепочек означает выполнение заданий mapreduce один за другим последовательно)?

И есть ли примеры, которые могут помочь?

2 ответа

Классическим примером работы, которая должна быть объединена в цепочку, является количество слов, которое выводит слова, отсортированные по их частоте.

Тебе понадобится:

Работа 1:

  • источник входного сопоставления (выбрасывает слово как ключ, одно как значение)
  • агрегирующий редуктор (агрегирует количество слов)

Работа 2:

  • маппер обмена ключами / значениями (делает частоту в качестве ключа, слово в качестве значения)
  • неявный редуктор идентичности (получает слова, отсортированные по частоте, не должен быть реализован)

Вот пример картографов / редукторов выше:

  public class HadoopWordCount {


  public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> {

    private final static Text word = new Text();
    private final static LongWritable one = new LongWritable(1);

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class KeyValueSwappingMapper extends Mapper<Text, LongWritable, LongWritable, Text> {

    public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException {
      context.write(value, key);
    }
  }

  public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable result = new LongWritable();

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
        InterruptedException {
      long sum = 0;
      for (LongWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

Вот пример программы драйвера.

Ожидается два аргумента:

  1. входной текстовый файл для подсчета слов.
  2. выходной каталог (не должен существовать заранее) - ищите выходные данные в файле {этот каталог}/out2/part-r-0000

    public static void main(String[] args) throws Exception {
    
    Configuration conf = new Configuration();
    Path out = new Path(args[1]);
    
    Job job1 = Job.getInstance(conf, "word count");
    job1.setJarByClass(HadoopWordCount.class);
    job1.setMapperClass(TokenizerMapper.class);
    job1.setCombinerClass(SumReducer.class);
    job1.setReducerClass(SumReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path(out, "out1"));
    if (!job1.waitForCompletion(true)) {
      System.exit(1);
    }
    Job job2 = Job.getInstance(conf, "sort by frequency");
    job2.setJarByClass(HadoopWordCount.class);
    job2.setMapperClass(KeyValueSwappingMapper.class);
    job2.setNumReduceTasks(1);
    job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
    job2.setOutputKeyClass(LongWritable.class);
    job2.setOutputValueClass(Text.class);
    job2.setInputFormatClass(SequenceFileInputFormat.class);
    FileInputFormat.addInputPath(job2, new Path(out, "out1"));
    FileOutputFormat.setOutputPath(job2, new Path(out, "out2"));
    if (!job2.waitForCompletion(true)) {
      System.exit(1);
    }
    
    }
    

Проще говоря, вам нужно объединить несколько заданий по сокращению карты, если ваша задача не может поместиться только в одно задание по сокращению карты.

Хороший пример - найти 10 лучших купленных предметов, этого можно достичь с помощью 2 рабочих мест:

  1. Карта сокращает работу, чтобы узнать, сколько раз покупается каждый предмет.

  2. Второе задание - сортировка предметов по количеству их покупок и получение 10 лучших предметов.

Чтобы получить полное представление, цепочка заданий генерирует промежуточные файлы, которые записываются и читаются с диска, поэтому это снизит производительность. Старайтесь по возможности избегать цепочки работ.

А вот как сковать работу.

Другие вопросы по тегам