Преждевременный EOF из inputStream в Hadoop
Я хочу читать большие файлы в Hadoop, блок за блоком (не строка за строкой), где каждый блок имеет размер почти 5 МБ. Для этого я написал обычай recordreader
, Но это дает мне ошибку Premature EOF from inputStream
, который вызван nextKeyValue()
, readfully()
, во время чтения.
Это мой код:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<Text, apriori> {
public Text key = new Text("");
public apriori value = new apriori();
public Configuration job;
public FileSplit filesplit;
public FSDataInputStream in;
public Boolean processed = false;
public int len = 5000000;
public long filepointer = 0;
public int mapperFlag = 0;
public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
this.filesplit = arg0;
this.job=arg1.getConfiguration();
}
@Override
public void close() throws IOException {
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public apriori getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
this.job = arg1.getConfiguration();
this.filesplit = (FileSplit)arg0;
final Path file = filesplit.getPath();
FileSystem fs = file.getFileSystem(job);
in = fs.open(file);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if ((!processed)&&(filesplit.getLength()>filepointer)) {
byte[] contents = new byte[ len];
Path file = filesplit.getPath();
key.set(file.getName());
in.seek(filepointer);
try {
IOUtils.readFully(in, contents, 0, len);
value.set(contents, 0, len);
} finally {
// IOUtils.closeStream(in);
}
filepointer = filepointer + len;
processed = false;
return true;
}
else if((!processed)&&(filesplit.getLength()<filepointer))
{
Path file = filesplit.getPath();
key.set(file.getName());
int last = (int)(filesplit.getLength()-(filepointer-len));
byte[] contents = new byte[last];
in.seek(filepointer-len);
try {
IOUtils.readFully(in, contents, 0, last);
mapperFlag =1;
value.set(contents, 0, last,mapperFlag);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
}