Hadoop 2: пустой результат при использовании пользовательского InputFormat
Я хочу использовать свой собственный FileInputFormat
с обычаем RecordReader
читать данные CSV в <Long><String>
пар.
Поэтому я создал класс MyTextInputFormat
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
и класс MyStringRecordReader
:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
В моем приложении Spark я прочитал файл, позвонив sparkContext.hadoopFile
метод. Но я получаю только пустой вывод из следующего кода:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
Я получаю только 10 пустых строк от СДР.
Консольный вывод с добавленным prints
выглядит следующим образом:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(Данные СДР напечатаны под =====
вывод (10 пустых строк!!!). Выход выше =====
кажется, сделано RDD.count
вызов. в next
метод правильные ключи и значения отображаются!? Что я делаю неправильно?
1 ответ
lineKey
а также lineValue
никогда не инициализируются key
а также value
перешел в переопределенный next
метод в вашем MyStringRecordReader
, Следовательно, он всегда показывает пустую строку, когда вы пытаетесь использовать RecordReader
, Если вам нужен другой ключ и значение для записи в файле, вам нужно использовать ключ и значение, переданные в next
метод и инициализировать их с вашим вычисленным ключом и значением. Если вы не собираетесь изменять запись ключ / значение, избавьтесь от следующего. Каждый раз, когда вы выполняете этот фрагмент кода, вы перезаписываете ключ / значение, считанные из файла, с вашей строкой EMPTY и 0L.
key = lineKey.get();
value = lineValue.toString();