Mapreduce с интеграцией HCATALOG с oozie в MAPR
Я написал программу mapreduce, которая считывает данные из таблицы улья с помощью HCATLOG и записывает в HBase. Это работа только на карте без редукторов. Я запустил программу из командной строки, и она работает, как и ожидалось (Создан толстый JAR, чтобы избежать проблем с Jar). Я хотел интегрировать это oozie (с помощью HUE) . У меня есть два варианта, чтобы запустить его
- Использовать Mapreduce Action
- Использовать действие Java
Так как моя программа Mapreduce имеет метод драйвера, который содержит следующий код
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
public class HBaseValdiateInsertDriver {
public static void main(String[] args) throws Exception {
String dbName = "Test";
String tableName = "emp";
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "HBase Get Put Demo");
job.setInputFormatClass(HCatInputFormat.class);
HCatInputFormat.setInput(job, dbName, tableName, null);
job.setJarByClass(HBaseValdiateInsertDriver.class);
job.setMapperClass(HBaseValdiateInsert.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path("maprfs:///user/input"));
FileOutputFormat.setOutputPath(job, new Path("maprfs:///user/output"));
job.waitForCompletion(true);
}
}
Как мне указать метод драйвера в oozie, все, что я могу видеть, это указать класс мапперов и редукторов. Может кто-нибудь подсказать мне, как мне установить свойства?
Используя действие Java, я могу указать свой класс драйвера в качестве основного класса и выполнить его, но я сталкиваюсь с ошибками, такими как таблица не найдена, баночки HCATLOG не найдены и т. Д. Я включил hive-site.xml в рабочий процесс (используя Hue), но я чувствую, что система не в состоянии подобрать свойства. Может кто-нибудь посоветовать мне, о чем мне нужно заботиться, есть ли какие-либо другие свойства конфигурации, которые мне нужно включить?
Также пример программы, которую я упоминал на сайте Cloudera, использует
HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
inputTableName, null));
где, как я использую ниже (я не вижу метод, который принимает вышеуказанный ввод
HCatInputFormat.setInput(job, dbName, tableName, null);
Ниже код моего картографа
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;
public class HBaseValdiateInsert extends Mapper<WritableComparable, HCatRecord, Text, Text> {
static HTableInterface table;
static HTableInterface inserted;
private String hbaseDate = null;
String existigValue=null;
List<Put> putList = new ArrayList<Put>();
@Override
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
String tablename = "dev_arch186";
Utils.getHBConnection();
table = Utils.getTable(tablename);
table.setAutoFlushTo(false);
}
@Override
public void cleanup(Context context) {
try {
table.put(putList);
table.flushCommits();
table.close();
} catch (IOException e) {
e.printStackTrace();
}
Utils.closeConnection();
}
@Override
public void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
String name_hive = (String) value.get(0);
String id_hive = (String) value.get(1);
String rec[] = test.toString().split(",");
Get g = new Get(Bytes.toBytes(name_hive));
existigValue=getOneRecord(Bytes.toBytes("Info"),Bytes.toBytes("name"),name_hive);
if (existigValue.equalsIgnoreCase("NA") || !existigValue.equalsIgnoreCase(id_hive)) {
Put put = new Put(Bytes.toBytes(rec[0]));
put.add(Bytes.toBytes("Info"),
Bytes.toBytes("name"),
Bytes.toBytes(rec[1]));
put.setDurability(Durability.SKIP_WAL);
putList.add(put);
if(putList.size()>25000){
table.put(putList);
table.flushCommits();
}
}
}
public String getOneRecord(byte[] columnFamily, byte[] columnQualifier, String rowKey)
throws IOException {
Get get = new Get(rowKey.getBytes());
get.setMaxVersions(1);
Result rs = table.get(get);
rs.getColumn(columnFamily, columnQualifier);
System.out.println(rs.containsColumn(columnFamily, columnQualifier));
KeyValue result = rs.getColumnLatest(columnFamily,columnQualifier);
if (rs.containsColumn(columnFamily, columnQualifier))
return (Bytes.toString(result.getValue()));
else
return "NA";
}
public boolean columnQualifierExists(String tableName, String ColumnFamily,
String ColumnQualifier, String rowKey) throws IOException {
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
return(rs.containsColumn(ColumnFamily.getBytes(),ColumnQualifier.getBytes()));
}
}
Примечание: я использую MapR (M3) Cluster с HUE в качестве интерфейса для oozie. Версия улья: 1-0 Версия HCAT: 1-0
1 ответ
Я не мог найти способ инициализировать HCatInputFormat из действия Oozie mapreduce. Но у меня есть обходной путь, как показано ниже.
Создан LazyHCatInputFormat путем расширения HCatInputFormat. Переопределите метод getJobInfo для обработки инициализации. Это будет вызываться как часть вызова getSplits(..).
private static void lazyInit(Configuration conf){
try{
if(conf==null){
conf = new Configuration(false);
}
conf.addResource(new Path(System.getProperty("oozie.action.conf.xml")));
conf.addResource(new org.apache.hadoop.fs.Path("hive-config.xml"));
String databaseName = conf.get("LazyHCatInputFormat.databaseName");
String tableName = conf.get("LazyHCatInputFormat.tableName");
String partitionFilter = conf.get("LazyHCatInputFormat.partitionFilter");
setInput(conf, databaseName, tableName);
//setFilter(partitionFilter);
//System.out.println("After lazyinit : "+conf.get("mapreduce.lib.hcat.job.info"));
}catch(Exception e){
System.out.println("*** LAZY INIT FAILED ***");
//e.printStackTrace();
}
}
public static InputJobInfo getJobInfo(Configuration conf)
throws IOException {
String jobString = conf.get("mapreduce.lib.hcat.job.info");
if (jobString == null) {
lazyInit(conf);
jobString = conf.get("mapreduce.lib.hcat.job.info");
if(jobString == null){
throw new IOException("job information not found in JobContext. HCatInputFormat.setInput() not called?");
}
}
return (InputJobInfo) HCatUtil.deserialize(jobString);
}
В карте действий-redcue, настроенной как показано ниже.
<property>
<name>mapreduce.job.inputformat.class</name>
<value>com.xyz.LazyHCatInputFormat</value>
</property>
<property>
<name>LazyHCatInputFormat.databaseName</name>
<value>HCAT DatabaseNameHere</value>
</property>
<property>
<name>LazyHCatInputFormat.tableName</name>
<value>HCAT TableNameHere</value>
</property>
Возможно, это не лучшая реализация, но быстрый взлом, чтобы заставить ее работать.