Поиск заданного слова в текстовом файле с помощью MapReduce в JAVA в Ubuntu 16.04
Я должен сделать проект, чтобы найти данное слово (строку). Эта строка будет введена пользователем. Затем найдите вхождение слова в конкретный текстовый файл, хранящийся в HDFS. Вывод должен сообщать о наличии слова string.
package stringSearchJob;
import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StringSearch{
public static void main(String argv[]) throws Exception {
try {
if (argv.length<3) {
System.err.println("Give the input/ output/ keyword!");
return;
}
JobConf conf = new JobConf(StringSearch.class);
Job job = new Job(conf,"StringSearch");
FileInputFormat.addInputPath(job, new Path(argv[0]));
FileOutputFormat.setOutputPath(job, new Path(argv[1]));
conf.set("search", argv[2]);
job.setJarByClass(StringSearch.class);
job.setMapperClass(WordMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
job.waitForCompletion(true);
}
catch (Exception e) {
e.printStackTrace();
}
}
public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
Configuration conf = context.getConfiguration();
String search = conf.get("search");
String line = value.toString();
Scanner scanner = new Scanner(line);
while (scanner.hasNext()) {
if (line.contains(search)) {
String line1 = scanner.next();
context.write(new Text(line1), new IntWritable(1));
}
}
scanner.close();
}
catch (IOException e){
e.printStackTrace();
}
catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
Мой код неверен? Потому что вывод, который я получаю на терминале Ubuntu-16.04, неверен. Шаги, которые я выполнил, следующие:
- После того, как вы завершили приведенный выше код, я экспортировал его в исполняемый файл JAR с именем StringSearch.jar. Имя класса было StringSearch.
Теперь в Терминале я написал следующие команды:
hadoop fs -mkdir /user hadoop fs -mkdir /user/hduser hadoop fs -mkdir /user/hduser/StringSearch hadoop fs -mkdir Stringsearch/input hadoop -fs -copyFromLocal sample.txt StringSearch/input hadoop jar StringSearchNew.jar StringSearch /user/hduser/StringSearch/input user/hduser/StringSearch/output 'Lord'
И я получаю ошибки следующим образом.
17/08/20 19:17:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/08/20 19:17:41 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 17/08/20 19:17:41 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/08/20 19:17:41 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set in JobConf. at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:117) at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:268) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:575) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:570) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:561) at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:870) at stringSearchJob.StringSearch.main(StringSearch.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Я в основном узнал, как использовать Hadoop MapReduce только из Интернета. Когда я попытался сделать программу в JAVA после прохождения всех других подобных ответов, она не дала результата. Я - новичок в Hadoop, и поэтому мне будет полезно, если вы поможете мне решить проблему. Я не понимаю, что здесь не так!
Прочитав ответ, я отредактировал код и получил следующие ошибки:
17/08/24 05:01:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:172)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:357)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(FileInputFormat.java:520)
at stringSearchJob.StringSearch.main(StringSearch.java:28)
... 11 more
1 ответ
Установите в качестве входного и выходного каталога объект JobConf, а не объект Job
Вы должны изменить, как показано ниже:
FileInputFormat.setInputPaths(conf /*from job to conf*/, new Path(args[0]));
FileOutputFormat.setOutputPath(conf /*from job to conf*/, new Path(args[1]));
Таким образом, модифицированный код должен выглядеть примерно так:
if (argv.length<3) {
System.err.println("Give the input/ output/ keyword!");
return;
}
JobConf conf = new JobConf(StringSearch.class);
Job job = new Job(conf,"StringSearch");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.set("search", argv[2]);
job.setJarByClass(StringSearch.class);
job.setMapperClass(WordMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
job.waitForCompletion(true);