Hadoop несколько входов
Я использую карту hadoop и я хочу вычислить два файла. Моя первая итерация Map/Reduce дает мне файл с идентификатором пары, например так:
A 30
D 20
Моя цель - использовать этот идентификатор из файла, чтобы связать его с другим файлом и получить еще один вывод с трио: идентификатор, номер, имя, например:
A ABC 30
D EFGH 20
Но я не уверен, является ли использование Map Reduce лучшим способом сделать это. Было бы лучше, например, использовать File Reader, чтобы прочитать второй входной файл и получить имя по идентификатору? Или я могу сделать это с помощью Map Reduce?
Если так, я пытаюсь выяснить как. Я попробовал решение MultipleInput:
MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
TextInputFormat.class, FlightsModeMapper.class);
Но я не могу придумать какое-либо решение, чтобы объединить два и получить желаемый результат. То, что у меня есть сейчас, - это просто дать мне список, как этот пример:
A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20
После моего последнего снижения я получаю это:
N125DL 767-332
N125DL 7 ,
N126AT 737-76N
N126AT 19 ,
N126DL 767-332
N126DL 1 ,
N127DL 767-332
N127DL 7 ,
N128DL 767-332
N128DL 3
Я хочу это: N127DL 7 767-332. А также, я не хочу тех, которые не объединяются.
И это мой редукционный класс:
Открытый класс Авиабилеты ByCarrierReducer2 расширяет Редуктор {
String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
context.write(token, new Text(merge));
}
}
Обновить:
http://stat-computing.org/dataexpo/2009/the-data.html это пример, который я использую.
Я пытаюсь с: TailNum и Canceled, который (1 или 0) получить название модели, которое соответствует TailNum. В моем файле с моделью есть TailNumb, Model и другие вещи. Мой текущий вывод:
N193JB ERJ 190-100 IGW
N194DN 767-332
N19503 EMB-135ER
N19554 EMB-145LR
N195DN 767-332
N195DN 2
Сначала идет ключ, во-вторых модель, ключи, у которых отменены рейсы, apperas ниже модели
И я хотел бы получить ключ трио, номер модели отменен, потому что я хочу, чтобы количество отмен в каждой модели
2 ответа
Вы можете присоединиться к ним, используя ID в качестве ключа для обоих картографов. Вы можете написать свою задачу карты как что-то вроде этого
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
//Get the line
//split the line to get ID seperate
//word1 = A
//word2 = 30
//Likewise for A ABC
//word1 = A
//word2 = ABC
context.write(word1, word2);
}
Я думаю, что вы можете повторно использовать ту же задачу карты. А затем напишите задание commomn Reducer, в котором Hadoop Framework группирует данные на основе ключей. Таким образом, вы сможете получить идентификатор в качестве ключа. И вы можете кэшировать одно из значений, а затем конкатить.
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
int i =0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
valEmit.set(merge);
context.write(key, valEmit);
}
Наконец, вы можете написать свой класс водителя
public int run(String[] args) throws Exception {
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
FileSystem fs = FileSystem.get(c);
if(fs.exists(p3)){
fs.delete(p3, true);
}
Job job = new Job(c,"Multiple Job");
job.setJarByClass(MultipleFiles.class);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
job.setReducerClass(MultipleReducer.class);
.
.
}
Вы можете найти пример ЗДЕСЬ
Надеюсь это поможет.
ОБНОВИТЬ
Input1
A 30
D 20
Input2
A ABC
D EFGH
Выход
A ABC 30
D EFGH 20
Mapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author sreeveni
*
*/
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
Text keyEmit = new Text();
Text valEmit = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String parts[] = line.split(" ");
keyEmit.set(parts[0]);
valEmit.set(parts[1]);
context.write(keyEmit, valEmit);
}
}
Reducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author sreeveni
*
*/
public class ReducerJoin extends Reducer<Text, Text, Text, Text> {
Text valEmit = new Text();
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String character = "";
String number = "";
for (Text value : values) {
// ordering output
String val = value.toString();
char myChar = val.charAt(0);
if (Character.isDigit(myChar)) {
number = val;
} else {
character = val;
}
}
merge = character + " " + number;
valEmit.set(merge);
context.write(key, valEmit);
}
}
Класс водителя
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author sreeveni
*
*/
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
// checking the arguments count
if (args.length != 3) {
System.err
.println("Usage : <inputlocation> <inputlocation> <outputlocation> ");
System.exit(0);
}
int res = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String source1 = args[0];
String source2 = args[1];
String dest = args[2];
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", " "); // changing default
// delimiter to user
// input delimiter
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Multiple Jobs");
job.setJarByClass(Driver.class);
Path p1 = new Path(source1);
Path p2 = new Path(source2);
Path out = new Path(dest);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
Mapper1.class);
job.setReducerClass(ReducerJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* delete if exist
*/
if (fs.exists(out))
fs.delete(out, true);
TextOutputFormat.setOutputPath(job, out);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
Ваш редуктор имеет метод map, но он должен иметь метод Reduce, который принимает коллекцию значений Iterable, которую вы затем объединяете. Поскольку у вас нет метода lower (), вы получаете поведение по умолчанию, которое заключается в том, чтобы просто проходить через все пары ключ / значение.