Метод установки не вызывается в Hadoop Mapper
Я запускаю последовательность Hadoop Mapper/Reducers и получаю список идентификаторов фильмов. Я использую файл MovieData для отображения названий фильмов на основе этих идентификаторов. Я использую класс Mapper, как показано ниже. Я вижу, что метод setUp не вызывается, потому что я не вижу оператора print, а также я получаю исключение Null, когда пытаюсь использовать этот HashMap, загруженный в метод load. Ниже приведен код. Любые указатели приветствуются.
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class MovieNamesMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
private static HashMap<String, String> movieNameHashMap = new HashMap<String, String>();
private BufferedReader bufferedReader;
private String movieId = "";
protected void setup(Context context) throws IOException,
InterruptedException {
System.out.println("Setting up system..");
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("u.item")) {
loadMovieNamesHashMap(eachPath, context);
}
}
}
private void loadMovieNamesHashMap(Path filePath, Context context)
throws IOException {
System.out.println("Loading movie names..");
String strLineRead = "";
try {
bufferedReader = new BufferedReader(new FileReader(
filePath.toString()));
while ((strLineRead = bufferedReader.readLine()) != null) {
String movieIdArray[] = strLineRead.toString().split("\t|::");
movieNameHashMap.put(movieIdArray[0].trim(),
movieIdArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
}
public void map(Object key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
System.out.println(key.toString() + " - " + value.toString());
if (value.toString().length() > 0) {
String moviePairArray[] = value.toString().split(":");
for (String moviePair : moviePairArray) {
String movieArray[] = moviePair.split(",");
output.collect(new Text(movieNameHashMap.get(movieArray[0])),
new Text(movieNameHashMap.get(movieArray[1])));
}
}
}
public String getMovieId() {
return movieId;
}
public void setMovieId(String movieId) {
this.movieId = movieId;
}
}
Ниже приводится мой метод запуска.
public int run(String[] args) throws Exception {
// For finding user and his rated movie list.
JobConf conf1 = new JobConf(MovieTopDriver.class);
conf1.setMapperClass(MoviePairsMapper.class);
conf1.setReducerClass(MoviePairsReducer.class);
conf1.setJarByClass(MovieTopDriver.class);
FileInputFormat.addInputPath(conf1, new Path(args[0]));
FileOutputFormat.setOutputPath(conf1, new Path("temp"));
conf1.setMapOutputKeyClass(Text.class);
conf1.setMapOutputValueClass(Text.class);
conf1.setOutputKeyClass(Text.class);
conf1.setOutputValueClass(IntWritable.class);
// For finding movie pairs.
JobConf conf2 = new JobConf(MovieTopDriver.class);
conf2.setMapperClass(MoviePairsCoOccurMapper.class);
conf2.setReducerClass(MoviePairsCoOccurReducer.class);
conf2.setJarByClass(MovieTopDriver.class);
FileInputFormat.addInputPath(conf2, new Path("temp"));
FileOutputFormat.setOutputPath(conf2, new Path("freq_temp"));
conf2.setInputFormat(KeyValueTextInputFormat.class);
conf2.setMapOutputKeyClass(Text.class);
conf2.setMapOutputValueClass(IntWritable.class);
conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(IntWritable.class);
// Find top frequent movies along with their names.
// Output Freq, moviePair
// Keep a count and output only 20.
JobConf conf3 = new JobConf(MovieTopDriver.class);
conf3.setMapperClass(ValueKeyMapper.class);
conf3.setReducerClass(ValueKeyReducer.class);
conf3.setJarByClass(MovieTopDriver.class);
FileInputFormat.addInputPath(conf3, new Path("freq_temp"));
FileOutputFormat.setOutputPath(conf3, new Path("freq_temp2"));
conf3.setInputFormat(KeyValueTextInputFormat.class);
conf3.setMapOutputKeyClass(IntWritable.class);
conf3.setMapOutputValueClass(Text.class);
conf3.setOutputKeyClass(IntWritable.class);
conf3.setOutputValueClass(Text.class);
// Use only one reducer as we want to sort.
conf3.setNumReduceTasks(1);
// To sort in decreasing order.
conf3.setOutputKeyComparatorClass(LongWritable.DecreasingComparator.class);
// Find top movie name
// Use a mapper side join to output names.
JobConf conf4 = new JobConf(MovieTopDriver.class);
conf4.setMapperClass(MovieNamesMapper.class);
conf4.setJarByClass(MovieTopDriver.class);
FileInputFormat.addInputPath(conf4, new Path("freq_temp2"));
FileOutputFormat.setOutputPath(conf4, new Path(args[1]));
conf4.setInputFormat(KeyValueTextInputFormat.class);
conf4.setMapOutputKeyClass(Text.class);
conf4.setMapOutputValueClass(Text.class);
// Run the jobs
Job job1 = new Job(conf1);
Job job2 = new Job(conf2);
Job job3 = new Job(conf3);
Job job4 = new Job(conf4);
JobControl jobControl = new JobControl("jobControl");
jobControl.addJob(job1);
jobControl.addJob(job2);
jobControl.addJob(job3);
jobControl.addJob(job4);
job2.addDependingJob(job1);
job3.addDependingJob(job2);
job4.addDependingJob(job3);
handleRun(jobControl);
FileSystem.get(conf2).deleteOnExit(new Path("temp"));
FileSystem.get(conf3).deleteOnExit(new Path("freq_temp"));
FileSystem.get(conf4).deleteOnExit(new Path("freq_temp2"));
System.out.println("Program complete.");
return 0;
}
Обновление: я использую Hadoop 1.2.1, и я могу использовать только это, поскольку я использую кластер в школе.
Обновление: используется настройка вместо установки, но она все равно не вызывается.
public void configure(JobConf jobConf) {
System.out.println("Setting up system..");
Path[] cacheFilesLocal;
try {
cacheFilesLocal = DistributedCache.getLocalCacheFiles(jobConf);
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("u.item")) {
loadMovieNamesHashMap(eachPath);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
Добавлено следующее в методе запуска.
DistributedCache.addFileToClassPath(new Path("moviedata"), conf4);
conf4.set("mapred.job.tracker", "local");
4 ответа
Вам нужно будет использовать метод настройки:
public void configure(JobConf job) {
}
Настройка не определена в документации
Если ваша IDE поддерживает это, попросите ваши методы переопределения IDE из суперклассов (в Eclipse это Source -> Override/Implement методов), чтобы увидеть, считает ли IDE, что вы неправильно указали тип (Context). Если вы ошиблись, Eclipse позволит вам переопределить метод, вставив заглушку с правильной подписью.
Чтобы быть точным, вам нужно решить, используете ли вы пакеты mapred (старый) или картографический (новый). Похоже, вы используете пакеты mapred (обратите внимание, что Context импортируется из неправильного пакета). Если вы хотите использовать пакет mapred, используйте метод configure (), в противном случае используйте setup () для пакета mapreduce
У меня есть код, работающий на Hadoop 1.2.1 (также протестирован на 2.2.0), который широко использует настройки. Вот как это выглядит в моем коде:
@Override
public void setup(Context context) throws IllegalArgumentException, IOException {
logger.debug("setup has been called");
}
Разница, которую я вижу, заключается в использовании "public" вместо "protected", а также в использовании @Override, которое должно помочь вам выяснить, неправильно ли вы переопределяете метод. Также обратите внимание, что я использую новый API (org.apache.hadoop.mapreduce).
--- АЛЬТЕРНАТИВНОЕ РЕШЕНИЕ ---
Я до сих пор не смог понять это тоже. Кажется, что модель вызова метода установки в начале маппера, перед любыми вызовами карты, может быть только частью нового API ( mapred vs mapreduce).
Я хотел использовать один и тот же метод карты для нескольких картографов, которые имеют разность только одной переменной. Невозможно переопределить переменную, поэтому я вызываю pulic void setup()
в начале метода map переопределяет это в дочерних мапперах. Конечно, он вызывается при каждом вызове карты (например, каждой строке во входных файлах для этих картографов), но это наименее неэффективный из моих неэффективностей на данный момент.
public static class Mapper1
extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text>
{
protected int someVar;
public void setup()
{
System.out.println("[LOG] setup called");
someVar = 1;
}
public void map(
LongWritable key,
Text value,
OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException
{
setup();
System.out.println("someVar: " + String.valueOf(someVar));
//...
output.collect(someKey, someValue);
}
}
public static class Mapper3
extends Mapper1
{
//protected int someVar;
//private int someVar;
/*
@Override
public void setup(Context context)
throws IOException, InterruptedException
{
System.out.println("[LOG] setup called");
someVar = 2;
}
@Override
public void configure(JobConf jobConf)
{
System.out.println("[LOG] configure called");
someVar = 2;
}
*/
@Override
public void setup()
{
System.out.println("[LOG] setup called");
someVar = 2;
}
}