Чтение записи разбито на две строки из-за /n в MapReduce

Я пытаюсь написать пользовательский ридер, который служит мне для чтения записи (состоящей из двух строк) с определенным количеством полей.

Например,

1,2,3,4("," can be there or not)
,5,6,7,8

Мое требование - прочитать запись и вставить ее в маппер как одну запись типа {1,2,3,4,5,6,7,8}, Пожалуйста, дайте некоторые входные данные.

ОБНОВИТЬ:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        key = new LongWritable();
    }

    //Current offset is the key
    key.set(pos); 

    if(value == null) {
        value = new Text();
    }

    int newSize = 0;
    int numFields = 0;
    Text temp = new Text();
    boolean firstRead = true;

    while(numFields < reqFields) {
        while(pos < end) {
            //Read up to the '\n' character and store it in 'temp'
            newSize = in.readLine(  temp, 
                                    maxLineLength, 
                                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                                             maxLineLength));

            //If 0 bytes were read, then we are at the end of the split
            if(newSize == 0) {
                break;
            }

            //Otherwise update 'pos' with the number of bytes read
            pos += newSize;

            //If the line is not too long, check number of fields
            if(newSize < maxLineLength) {
                break;
            }

            //Line too long, try again
            LOG.info("Skipped line of size " + newSize + " at pos " + 
                        (pos - newSize));
        }

        //Exit, since we're at the end of split
        if(newSize == 0) {
            break;
        }
        else {
            String record = temp.toString();
            StringTokenizer fields = new StringTokenizer(record,"|");

            numFields += fields.countTokens();

            //Reset 'value' if this is the first append
            if(firstRead) {
                value = new Text();
                firstRead = false;
            }

            if(numFields != reqFields) {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
            else {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    }
    else {
        return true;
    }
}

}

Это метод nextKeyValue, над которым я пытаюсь работать. Но все же картограф не получает правильных значений.reqFields 4.

2 ответа

Решение

Строка должна быть токенизирована с использованием StringTokenizer, а не разделена. Код был обновлен с новой реализацией.

Посмотрите, как реализован TextInputFormat. Посмотрите на это суперкласс, FileInputFormat, а также. Вы должны создать подкласс класса TextInputFormat в FileInputFormat и реализовать собственную обработку записей.

При реализации любого формата ввода файлов нужно знать следующее:

Framework разделит файл и даст вам начальное смещение и длину в байтах фрагмента файла, который вы должны прочитать. Вполне может случиться, что он разбивает файл прямо по какой-то записи. Вот почему ваш читатель должен пропустить байты записи в начале разделения, если эта запись не полностью содержится в разделении, а также прочитать последний байт разделения, чтобы прочитать всю последнюю запись, если эта запись не полностью содержится в расколе.

Например, TextInoutFormat обрабатывает символы \n как разделители записей, поэтому при получении разделения они пропускают байты до первого символа \n и читают после конца разделения до символа \n.

Что касается примера кода:

Вам нужно задать себе следующий вопрос: скажем, вы открываете файл, ищите случайную позицию и начинаете читать вперед. Как вы определяете начало записи? Я не вижу в вашем коде ничего такого, что могло бы с этим справиться, и без этого вы не сможете написать хороший формат ввода, потому что вы не знаете, каковы границы записей.

Теперь все еще можно сделать так, чтобы входной формат читал весь файл целиком, заставляя метод isSplittable(JobContext,Path) возвращать false. Это делает файл полностью прочитанным с помощью одной задачи карты, что уменьшает параллелизм.

Ваш внутренний цикл while кажется проблематичным, поскольку он проверяет слишком длинные строки и пропускает их. Учитывая, что ваши записи написаны с использованием нескольких строк, может случиться так, что вы объедините одну часть одной записи и другую часть другой записи при ее чтении.

Другие вопросы по тегам