Вывод Hadoop HDFS MapReduce в MongoDb

Я хочу написать программу на Java, которая читает входные данные из HDFS, обрабатывает их с помощью MapReduce и записывает выходные данные в MongoDb.

Вот сценарий:

  1. У меня есть кластер Hadoop, который имеет 3 datanodes.
  2. Java-программа читает входные данные из HDFS и обрабатывает их с помощью MapReduce.
  3. Наконец, запишите результат в MongoDb.

На самом деле чтение из HDFS и обработка его с помощью MapReduce просты. Но я застреваю при записи результата в MongoDb. Поддерживается ли какой-либо API Java для записи результата в MongoDB? Другой вопрос заключается в том, что, поскольку это кластер Hadoop, поэтому мы не знаем, какой узел данных будет запускать задачу Reducer и генерировать результат, возможно ли записать результат в MongoDb, который установлен на конкретном сервере?

Если я хочу записать результат в HDFS, код будет выглядеть так:

@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
{
    long sum = 0;
    for (LongWritable value : values) 
    {
        sum += value.get();
    }

    context.write(new Text(key), new LongWritable(sum));
}

Теперь я хочу записать результат в MongoDb вместо HDFS, как я могу это сделать?

3 ответа

Вы хотите "Соединитель MongoDB для Hadoop". Примеры.

Соблазнительно просто добавить код в ваш Reducer, который в качестве побочного эффекта вставляет данные в вашу базу данных. Избегайте этого искушения. Одной из причин использования соединителя, а не просто вставки данных в качестве побочного эффекта вашего класса редуктора, является умозрительное выполнение: иногда Hadoop может выполнять две точно такие же задачи сокращения параллельно, что может привести к посторонним вставкам и дублированию данных.

Да. Ты пишешь в монго как обычно. Тот факт, что ваш dong монго настроен на работу с осколками, является деталью, которая скрыта от вас.

Я провел свое утро, чтобы реализовать тот же сценарий. Вот мое решение:

Создайте три класса:

  • Experiment.java: для настройки и представления работы
  • MyMap.java: класс картостроителя
  • MyReduce.java: класс редуктора

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.mongodb.hadoop.io.BSONWritable;
    import com.mongodb.hadoop.mapred.MongoOutputFormat;
    
    public class Experiment extends Configured implements Tool{
    
         public int run(final String[] args) throws Exception {
            final Configuration conf = getConf();
            conf.set("mongo.output.uri", args[1]);
    
            final JobConf job = new JobConf(conf);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setJarByClass(Experiment.class);
    
            job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputFormat(MongoOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BSONWritable.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            JobClient.runJob(job);
    
            return 0;
        }
    
        public static void main(final String[] args) throws Exception{
    
            int res = ToolRunner.run(new TweetPerUserToMongo(), args);
            System.exit(res);
        }
    }
    

Когда вы запустите класс Experiment из вашего кластера, вы введете два параметра. Первый параметр - это ваш входной источник из местоположения HDFS, второй параметр - это URI mongodb, который сохранит ваши результаты. Вот пример звонка. Предполагая, что ваш Experiment.java находится под именем пакета org.example.

sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName

Это может быть не лучшим способом, но это делает работу для меня.

Другие вопросы по тегам