Hadoop 2: пустой результат при использовании пользовательского InputFormat

Я хочу использовать свой собственный FileInputFormat с обычаем RecordReader читать данные CSV в <Long><String> пар.

Поэтому я создал класс MyTextInputFormat:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class MyTextInputFormat extends FileInputFormat<Long, String> {

  @Override
  public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
      reporter.setStatus(input.toString());
      return new MyStringRecordReader(job, (FileSplit)input);
  }

  @Override
  protected boolean isSplitable(FileSystem fs, Path filename) {
    return super.isSplitable(fs, filename);
  }
}

и класс MyStringRecordReader:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class MyStringRecordReader implements RecordReader<Long, String> {

    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
        lineReader = new LineRecordReader(job, split);

        lineKey = lineReader.createKey();
        lineValue = lineReader.createValue();

        System.out.println("constructor called");
    }

    @Override
    public void close() throws IOException {
        lineReader.close();
    }

    @Override
    public Long createKey() {
        return lineKey.get();
    }

    @Override
    public String createValue() {
        System.out.println("createValue called");
        return lineValue.toString();
    }

    @Override
    public long getPos() throws IOException {
        return lineReader.getPos();
    }

    @Override
    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }

    @Override
    public boolean next(Long key, String value) throws IOException {
        System.out.println("next called");

        // get the next line
        if (!lineReader.next(lineKey, lineValue)) {
            return false;
        }

        key = lineKey.get();
        value = lineValue.toString();

        System.out.println(key);
        System.out.println(value);


        return true;
    }
}

В моем приложении Spark я прочитал файл, позвонив sparkContext.hadoopFile метод. Но я получаю только пустой вывод из следующего кода:

public class AssociationRulesAnalysis {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
            @Override
            public String call(Tuple2<Long, String> arg0) throws Exception {
                System.out.println("map: " + arg0._2());
                return arg0._2();
            }
        });

        List<String> asList = inputRdd.take(10);
        for(String s : asList) {
            System.out.println(s);
        }
    }
}

Я получаю только 10 пустых строк от СДР.

Консольный вывод с добавленным prints выглядит следующим образом:

=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:










Stopping...

(Данные СДР напечатаны под ===== вывод (10 пустых строк!!!). Выход выше ===== кажется, сделано RDD.count вызов. в next метод правильные ключи и значения отображаются!? Что я делаю неправильно?

1 ответ

Решение

lineKey а также lineValue никогда не инициализируются key а также value перешел в переопределенный next метод в вашем MyStringRecordReader, Следовательно, он всегда показывает пустую строку, когда вы пытаетесь использовать RecordReader, Если вам нужен другой ключ и значение для записи в файле, вам нужно использовать ключ и значение, переданные в next метод и инициализировать их с вашим вычисленным ключом и значением. Если вы не собираетесь изменять запись ключ / значение, избавьтесь от следующего. Каждый раз, когда вы выполняете этот фрагмент кода, вы перезаписываете ключ / значение, считанные из файла, с вашей строкой EMPTY и 0L.

key = lineKey.get();
value = lineValue.toString();
Другие вопросы по тегам