Передача аргументов читателю записи в mapreduce hadoop
Это мой код для использования variours arg
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;
public class Docsparser {
private static String Delimiter;
public static class DocsInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new DocsLineRecordReader();
}
}
public static class DocsLineRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private int currentword = 0;
private String fileline;
private File file = null;
private String line;
private HWPFDocument document;
private WordExtractor extractor = null;
private String[] filedata;
StringBuilder sb = new StringBuilder();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
final Path file = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(fileSplit.getPath());
String Delim = conf.get("Delim");
if (filein != null)
{
HWPFDocument document = new HWPFDocument(filein);
extractor = new WordExtractor(document);
fileline = extractor.getText();
filedata = fileline.split(Delim);
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
if(currentword < filedata.length)
{
for ( currentword=0;currentword < filedata.length; currentword++)
{
sb.append(filedata[currentword] +",");
line = sb.toString();
}
key.set(line);
value.set("");
return true;
}
else
{
key = null;
value = null;
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (100.0f / filedata.length * currentword) / 100.0f;
}
@Override
public void close() throws IOException {
}
}
public static class Map extends Mapper<Text, Text, Text, Text>{
public void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
context.write(key,value);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "Docsparser");
job.setJarByClass(Docsparser.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
job.setInputFormatClass(DocsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Детали исключения:
15/09/28 03:50:04 ИНФОРМАЦИЯ mapreduce.Job: идентификатор задачи: try_1443193152998_2319_m_000000_2, статус: СБОЙ Ошибка: java.lang.NullPointerException в java.lang.String.split(String.java:2272) в java.lang.String.split(String.java:2355) в com.nielsen.grfe.Docsparser$DocsLineRecordReader.initialize(Docsparser.java:66) в org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.jpg) в org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786) в org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) в org.apache.hadoop.mapred.YarnChild$2. выполнить (YarnChild.java:163) в java.security.AccessController.doPrivileged(собственный метод) в javax.security.auth.Subject.doAs(Subject.java:415) в org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) в org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2 ответа
Все переменные конфигурации должны быть установлены до инициализации класса задания. Переехать
Delimiter = args[2].toString();
conf.set("Delim",Delimiter);
до
Job job = new Job(conf, "Docsparser");
NullPointerException
происходит в split
метод fileline
строка. Я подозреваю, что вы не установили "Delim"
значение конфигурации и, следовательно, ваша переменная Delim
является null
,