Проблемы со вставкой Kafka Spark в HBase
Я использую Kafka для отправки файла с 3 столбцами, используя Spark Streaming 1.3 для вставки в HBase. Вот так выглядит мой HBase:
ROW COLUMN+CELL
zone:bizert column=travail:call, timestamp=1491836364921, value=contact:numero
zone:jendouba column=travail:Big data, timestamp=1491835836290, value=contact:email
zone:tunis column=travail:info, timestamp=1491835897342, value=contact:num
3 row(s) in 0.4200 seconds
И вот как я читаю данные с помощью Stream Streaming, я использую spark-shell
:
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(sc, Seconds(10))
val topicSet = Set ("zed")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "xx.xx.xxx.xx:9092")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
lines.foreachRDD(rdd => { (!rdd.partitions.isEmpty)
lines.saveAsTextFiles("hdfs://xxxxx:8020/user/admin/zed/steams3/")
})
этот код работает, когда я сохраняю данные в HDFS, даже он сохраняет много пустых данных в HDFS. перед тем, как написать этот вопрос, я искал здесь и другой вопрос, такой как мой, но у меня не было хорошего решения.
Можете ли вы предложить лучший способ сделать это? Вот так выглядит мой код
val sc = new SparkContext("local", "Hbase spark")
val tableName = "notz"
val conf = HBaseConfiguration.create()
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
lines.foreachRDD(rdd => { (!rdd.partitions.isEmpty)
if(!admin.isTableAvailable(tableName)) {
print("Creating GHbase Table")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("zone"
.getBytes()))
admin.createTable(tableDesc)
}else{
print("Table already exists!!")
}
val myTable = new HTable(conf, tableName)
// i'm blocked here
})