Реализация пользовательских программ чтения Hadoop

Я не могу понять, что происходит в методе nextKeyValue(), объясненном по ссылке ниже:

http://analyticspro.org/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

особенно цикл for в nextKeyValue()

Любая помощь будет заметна

заранее спасибо

3 ответа

nextKeyValue() - это основная функция, которая устанавливает пару ключ и значение для конкретного вызова карты. Итак, по вашей ссылке, приведенному ниже коду (перед циклом for), он просто устанавливает ключ с pos, который является ничем иным, как начальным смещением key.set(pos) И это буферизует ранее установленное значение. Соответствующий код:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;

После цикла. Я добавил достаточно комментариев для каждой строки.

       for(int i=0;i<NLINESTOPROCESS;i++){ //Since this is NLineInputFormat they want to read 3 lines at a time and set that as value,
so this loop will continue until that is satisfied.
            Text v = new Text();
            while (pos < end) { //This is to prevent the recordreader from reading the second split, if it is currently reading the first split. pos would be start
of the split and end would be end offset of the split. 
                newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
//This calls the linereader readline function which reads until it encounters a newline(default delim for TextInputformat and maxlinelength would be max integer size
just to ensure the whole line doesn''t go beyond the maxlinelength and the line read would be stored in Text variable v)
                value.append(v.getBytes(),0, v.getLength());
//Reads from v(whole line) and appends it to the value,append is necessary because we are going to read 3 lines.
                value.append(endline.getBytes(),0, endline.getLength());
//appends newline to each line read
                if (newSize == 0) {
                    break;//If there is nothing to read then come out.
                }
                pos += newSize;
                if (newSize < maxLineLength) {//There is a flaw here it should be >=, to imply if the read line is greater than max integer size then come out
                    break;
                }
            }
        }
        if (newSize == 0) {
            key = null;//If there is nothing to read assign key and value as null else continue the process by returning true to map call.
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

Метод nextKeyValue() будет использоваться каждым картографом для перебора всех разделенных записей.

Класс NLinesRecordReader определяет, что каждая запись имеет 3 строки.

private final int NLINESTOPROCESS = 3;

Основная роль цикла в nextKeyValue() состоит в том, чтобы получить для каждой записи 3 строки. Запись будет использоваться в качестве входного значения в методе map().

Всякий раз, когда требуются новые данные, происходят две вещи. Первый вопрос, который задают читателю:

У вас есть какие-либо данные???

Если читатель отвечает "да", то вызывающая сторона может получить данные из метода getCurrentValue.

Теперь метод nextKeyValue выполняет эту задачу, он просто отвечает на вопрос: У ВАС ЕСТЬ ЛИ ДАННЫЕ, КОТОРЫЕ ОСТАЛИСЬ МЕНЯ?

Я не могу получить доступ к ссылке из-за проблем с брандмауэром, но пример реализации, который я использовал

HashMap<Integer, Invoice> allData= new HashMap<Integer, Invoice>();

    @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        this.key = new LongWritable();
    }
    this.key.set(startPos);

    if(value == null) {
        this.value = new Invoice();
    }
    if(startPos >= endPos) {
        key = null;
        value = null;
        return false;
    } else {
        this.value = allData.get(startPos);
        startPos = startPos + 1;
        return true;
    }
}

Здесь Invoice это просто POJO. и в методе initialize я ничего не делал, только проанализировал весь документ и сохранил в hashmap. В методе nextKeyValue проверьте, существует ли следующий ключ, если он действительно возвращает соответствующее значение, в противном случае возвращаемый ключ не существует.

Другие вопросы по тегам