Вывод Hadoop HDFS MapReduce в MongoDb
Я хочу написать программу на Java, которая читает входные данные из HDFS, обрабатывает их с помощью MapReduce и записывает выходные данные в MongoDb.
Вот сценарий:
- У меня есть кластер Hadoop, который имеет 3 datanodes.
- Java-программа читает входные данные из HDFS и обрабатывает их с помощью MapReduce.
- Наконец, запишите результат в MongoDb.
На самом деле чтение из HDFS и обработка его с помощью MapReduce просты. Но я застреваю при записи результата в MongoDb. Поддерживается ли какой-либо API Java для записи результата в MongoDB? Другой вопрос заключается в том, что, поскольку это кластер Hadoop, поэтому мы не знаем, какой узел данных будет запускать задачу Reducer и генерировать результат, возможно ли записать результат в MongoDb, который установлен на конкретном сервере?
Если я хочу записать результат в HDFS, код будет выглядеть так:
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
{
long sum = 0;
for (LongWritable value : values)
{
sum += value.get();
}
context.write(new Text(key), new LongWritable(sum));
}
Теперь я хочу записать результат в MongoDb вместо HDFS, как я могу это сделать?
3 ответа
Вы хотите "Соединитель MongoDB для Hadoop". Примеры.
Соблазнительно просто добавить код в ваш Reducer, который в качестве побочного эффекта вставляет данные в вашу базу данных. Избегайте этого искушения. Одной из причин использования соединителя, а не просто вставки данных в качестве побочного эффекта вашего класса редуктора, является умозрительное выполнение: иногда Hadoop может выполнять две точно такие же задачи сокращения параллельно, что может привести к посторонним вставкам и дублированию данных.
Да. Ты пишешь в монго как обычно. Тот факт, что ваш dong монго настроен на работу с осколками, является деталью, которая скрыта от вас.
Я провел свое утро, чтобы реализовать тот же сценарий. Вот мое решение:
Создайте три класса:
- Experiment.java: для настройки и представления работы
- MyMap.java: класс картостроителя
MyReduce.java: класс редуктора
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.mapred.MongoOutputFormat; public class Experiment extends Configured implements Tool{ public int run(final String[] args) throws Exception { final Configuration conf = getConf(); conf.set("mongo.output.uri", args[1]); final JobConf job = new JobConf(conf); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setJarByClass(Experiment.class); job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputFormat(MongoOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BSONWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); JobClient.runJob(job); return 0; } public static void main(final String[] args) throws Exception{ int res = ToolRunner.run(new TweetPerUserToMongo(), args); System.exit(res); } }
Когда вы запустите класс Experiment из вашего кластера, вы введете два параметра. Первый параметр - это ваш входной источник из местоположения HDFS, второй параметр - это URI mongodb, который сохранит ваши результаты. Вот пример звонка. Предполагая, что ваш Experiment.java находится под именем пакета org.example.
sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName
Это может быть не лучшим способом, но это делает работу для меня.