Чтение данных с FTP-сервера в Hadoop/Cascading
Я хочу прочитать данные с FTP-сервера. Я предоставляю путь к файлу, который находится на FTP-сервере, в формате ftp: // Имя пользователя: Пароль @ хост / путь. Когда я использую программу преобразования карт для чтения данных из файла, она работает нормально. Я хочу читать данные из того же файла через Cascading Framework. Я использую каскад Hfs каскадной структуры для чтения данных. Выдает следующее исключение
java.io.IOException: Stream closed
at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98)
at java.io.FilterInputStream.close(Unknown Source)
at org.apache.hadoop.util.LineReader.close(LineReader.java:83)
at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
Ниже приведен код каскадного фреймворка, откуда я читаю файлы:
public class FTPWithHadoopDemo {
public static void main(String args[]) {
Tap source = new Hfs(new TextLine(new Fields("line")), "ftp://user:pwd@xx.xx.xx.xx//input1");
Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op", SinkMode.REPLACE);
Pipe pipe = new Pipe("First");
pipe = new Each(pipe, new RegexSplitGenerator("\\s+"));
pipe = new GroupBy(pipe);
Pipe tailpipe = new Every(pipe, new Count());
FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(tailpipe, sink);
new HadoopFlowConnector().connect(flowDef).complete();
}
}
Я попытался посмотреть в исходном коде Hadoop для того же исключения. Я обнаружил, что в классе MapTask есть один метод runOldMapper, который работает с потоком. И в этом же методе есть наконец блок, где поток закрывается (in.close ()). Когда я удаляю эту строку из блока наконец, он работает нормально. Ниже приведен код:
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, InterruptedException, ClassNotFoundException {
InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
updateJobWithSplit(job, inputSplit);
reporter.setInputSplit(inputSplit);
RecordReader<INKEY, INVALUE> in = isSkipping()
? new SkippingRecordReader<INKEY, INVALUE>(inputSplit, umbilical, reporter)
: new TrackedRecordReader<INKEY, INVALUE>(inputSplit, job, reporter);
job.setBoolean("mapred.skip.on", isSkipping());
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
MapRunnable<INKEY, INVALUE, OUTKEY, OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(),
job);
try {
runner.run(in, new OldOutputCollector(collector, conf), reporter);
collector.flush();
} finally {
// close
in.close(); // close input
collector.close();
}
}
Пожалуйста, помогите мне в решении этой проблемы.
Спасибо, Аршадали
1 ответ
После некоторых усилий я узнал, что использует hadoop org.apache.hadoop.fs.ftp.FTPFileSystem
Класс для FTP.
Этот класс не поддерживает поиск, то есть поиск указанного смещения от начала файла. Данные считываются в одном блоке, а затем файловая система ищет следующий блок для чтения. Размер блока по умолчанию составляет 4 КБ для FTPFileSystem
, Поскольку поиск не поддерживается, он может только читать данные, меньшие или равные 4 КБ.