Mapreduce Custom TextOutputFormat - Странные символы NUL, SOH и т. Д.
Я реализовал пользовательский формат вывода для преобразования пар ключ-значение в формат Json.
public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path path = getOutputPath(context);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(new Path(path,context.getJobName()));
return new JsonRecordWriter(out);
}
}
private static class JsonRecordWriter extends LineRecordWriter<Text,IntWritable>{
boolean firstRecord = true;
@Override
public synchronized void close(TaskAttemptContext context) throws IOException {
out.writeBytes("}");
super.close(context);
}
@Override
public synchronized void write(Text key, IntWritable value)
throws IOException {
if (!firstRecord){
out.writeBytes(",\r\n");
firstRecord = false;
}
out.writeUTF(key.toString() + ":" +value.toString());
}
public JsonRecordWriter(DataOutputStream out) throws IOException{
super(out);
out.writeBytes("{");
}
}
Однако выходные данные задания Mapreduce имеют некоторые нежелательные символы, такие как: {NUL Chair: 12 NUL BS Book: 1}
Мой класс водителя выглядит следующим образом:
public class Driver {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for(String word: words)
context.write(new Text(word), one);
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Iterator it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable c = (IntWritable) it.next();
count+=c.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcountjson");
job.setJarByClass(Driver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(JSONOutputFormat.class);
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true)?0:1);
}
}
Есть идеи, почему эти символы появляются в выводе?