Чтение и запись Sequencefile с использованием Hadoop 2.0 Apis
Я ищу пример, который использует новый API для чтения и записи файлов последовательности.
Эффективно мне нужно знать, как использовать эти функции
createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts)
Старое определение не работает для меня:
SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
Точно так же мне нужно знать, какой будет код для чтения файла Sequence, поскольку следующее не рекомендуется:
SequenceFile.Reader(fs, path, conf);
Вот способ использовать то же самое -
String uri = args[0];
Configuration conf = new Configuration();
Path path = new Path( uri);
IntWritable key = new IntWritable();
Text value = new Text();
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer writer = null;
Option optPath = SequenceFile.Writer.file(path);
Option optKey = SequenceFile.Writer.keyClass(key.getClass());
Option optVal = SequenceFile.Writer.valueClass(value.getClass());
Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD, Codec);
writer = SequenceFile.createWriter( conf, optPath, optKey, optVal, optCom);
4 ответа
public class SequenceFilesTest {
@Test
public void testSeqFileReadWrite() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path seqFilePath = new Path("file.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
Writer.file(seqFilePath), Writer.keyClass(Text.class),
Writer.valueClass(IntWritable.class));
writer.append(new Text("key1"), new IntWritable(1));
writer.append(new Text("key2"), new IntWritable(2));
writer.close();
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
Reader.file(seqFilePath));
Text key = new Text();
IntWritable val = new IntWritable();
while (reader.next(key, val)) {
System.err.println(key + "\t" + val);
}
reader.close();
}
}
Я опоздал с ответом более чем на год, но только начал работать с Hadoop 2.4.1:)
Ниже приведен код, кто-то может найти его полезным.
Примечание. Он содержит закомментированный код 1.x для чтения и записи файла последовательности. Мне было интересно, где он берет файловую систему, но когда я выполнял ее непосредственно в кластере, он выбрал ее правильно (вероятно, из core-site.xml, как упоминалось в разделе " Конфигурация").
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader.Option;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileOperator {
private Configuration conf = new Configuration();
/*private FileSystem fs;
{
try {
fs = FileSystem.get(URI.create("hdfs://cldx-1336-1202:9000"), conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if (args == null || args.length < 2) {
System.out
.println("Following are the possible invocations <operation id> <arg1> <arg2> ...");
System.out
.println("1 <absolute path of directory containing documents> <HDFS path of the sequence file");
System.out.println("2 <HDFS path of the sequence file>");
return;
}
int operation = Integer.valueOf(args[0]);
SequenceFileOperator docToSeqFileWriter = new SequenceFileOperator();
switch (operation) {
case 1: {
String docDirectoryPath = args[1];
String sequenceFilePath = args[2];
System.out.println("Writing files present at " + docDirectoryPath
+ " to the sequence file " + sequenceFilePath);
docToSeqFileWriter.loadDocumentsToSequenceFile(docDirectoryPath,
sequenceFilePath);
break;
}
case 2: {
String sequenceFilePath = args[1];
System.out.println("Reading the sequence file " + sequenceFilePath);
docToSeqFileWriter.readSequenceFile(sequenceFilePath);
break;
}
}
}
private void readSequenceFile(String sequenceFilePath) throws IOException {
// TODO Auto-generated method stub
/*
* SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(fs,
* new Path(sequenceFilePath), conf);
*/
Option filePath = SequenceFile.Reader.file(new Path(sequenceFilePath));
SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(conf,
filePath);
Writable key = (Writable) ReflectionUtils.newInstance(
sequenceFileReader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
sequenceFileReader.getValueClass(), conf);
try {
while (sequenceFileReader.next(key, value)) {
System.out
.printf("[%s] %s %s \n",
sequenceFileReader.getPosition(), key,
value.getClass());
}
} finally {
IOUtils.closeStream(sequenceFileReader);
}
}
private void loadDocumentsToSequenceFile(String docDirectoryPath,
String sequenceFilePath) throws IOException {
// TODO Auto-generated method stub
File docDirectory = new File(docDirectoryPath);
if (!docDirectory.isDirectory()) {
System.out
.println("Please provide an absolute path of a directory that contains the documents to be added to the sequence file");
return;
}
/*
* SequenceFile.Writer sequenceFileWriter =
* SequenceFile.createWriter(fs, conf, new Path(sequenceFilePath),
* Text.class, BytesWritable.class);
*/
org.apache.hadoop.io.SequenceFile.Writer.Option filePath = SequenceFile.Writer
.file(new Path(sequenceFilePath));
org.apache.hadoop.io.SequenceFile.Writer.Option keyClass = SequenceFile.Writer
.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option valueClass = SequenceFile.Writer
.valueClass(BytesWritable.class);
SequenceFile.Writer sequenceFileWriter = SequenceFile.createWriter(
conf, filePath, keyClass, valueClass);
File[] documents = docDirectory.listFiles();
try {
for (File document : documents) {
RandomAccessFile raf = new RandomAccessFile(document, "r");
byte[] content = new byte[(int) raf.length()];
raf.readFully(content);
sequenceFileWriter.append(new Text(document.getName()),
new BytesWritable(content));
raf.close();
}
} finally {
IOUtils.closeStream(sequenceFileWriter);
}
}
}
Для чтения вы можете использовать
Path path= new Path("/bar");
Reader sequenceFileReader = new SequenceFile.Reader(conf,SequenceFile.Reader.file(path));
Вам нужно установить SequenceFile
в качестве входного формата
job.setInputFormatClass(SequenceFileInputFormat.class);
Вы найдете пример чтения формы HDFS SeequnceFile здесь.