Загрузка CSV-данных в Hbase с использованием Scala
Есть ли способ загрузить CSV-файлы в hbase с помощью Scala, не используя спарк? Я ищу инструмент, похожий на Happybase.
1 ответ
Проверьте эту ссылку для справки: http://www.mccarroll.net/snippets/hbaseload/ Надеюсь, это поможет.
HBaseImportExport - это суперкласс, который определяет некоторый общий код
class HBaseImportExport {
var DEFAULT_COLUMN_FAMILY: String = "c1"
class HBaseCol(var family: String, var qualifier: String)
}
HBaseImporter интерпретирует первую строку файла CSV как имена полей. При разработке, как импортировать данные CSV в HBase.
import java.io.File
import java.io.FileInputStream
import java.io.IOException
import java.util.ArrayList
import java.util.HashSet
import java.util.List
import java.util.Set
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.log4j.Logger
import HBaseImporter._
object HBaseImporter {
def main(args: Array[String]): Unit = {
if (args.length < 2 || args.length > 3) {
println(
"Usage: HBaseImporter <tablename> <csv file path> [<key field name>]")
}
val tableName: String = args(0)
val f: File = new File(args(1))
var keyColumn: String = null
if (args.length > 2) {
keyColumn = args(2)
}
val importer: HBaseImporter = new HBaseImporter(tableName)
importer.importCSV(f, keyColumn)
}
}
class HBaseImporter(var tableName: String) extends HBaseImportExport {
var admin: HBaseAdmin = _
var config: Configuration = _
var families: Set[String] = new HashSet[String]()
var columns: List[HBaseCol] = new ArrayList[HBaseCol]()
var keyPosition: Int = -1
def init(): Unit = {
config = HBaseConfiguration.create()
admin = new HBaseAdmin(config)
}
private def deleteTable(): Unit = {
try {
admin.disableTable(tableName)
admin.deleteTable(tableName)
} catch {
case e: Exception => {}
}
}
private def createTable(): Unit = {
val desc: HTableDescriptor = new HTableDescriptor(tableName)
admin.createTable(desc)
admin.disableTable(tableName)
for (family <- families) {
val cf1: HColumnDescriptor = new HColumnDescriptor(family)
admin.addColumn(tableName, cf1)
}
admin.enableTable(tableName)
}
private def analyzeHeaders(headers: Array[String], keyColumn: String): Unit = {
columns.clear()
families.clear()
var col: Int = 0
for (header <- headers) {
var family: String = DEFAULT_COLUMN_FAMILY
var qualifier: String = header
var pos: Int = 0
if ((pos = header.indexOf(":")) > 0) {
family = header.substring(0, pos)
qualifier = header.substring(pos + 1)
}
columns.add(new HBaseCol(family, qualifier))
families.add(family)
if (header == keyColumn) {
keyPosition = col
}
{ col += 1; col - 1 }
}
}
private def loadData(cis: CsvInputStream): Unit = {
val table: HTable = new HTable(config, tableName)
var vals: Array[String] = cis.readLine()
val logger: Logger = org.apache.log4j.Logger.getLogger(this.getClass)
var counter: Int = 0
var rowId: String = ""
while (vals != null) {
rowId =
if (keyPosition >= 0 && keyPosition < vals.length) vals(keyPosition)
else "r" + counter
val put: Put = new Put(rowId.getBytes("UTF-8"))
var col: Int = 0
for (column <- columns) {
if (col >= vals.length) {
//break
}
put.add(column.family.getBytes("UTF-8"),
column.qualifier.getBytes,
vals(col).getBytes)
col += 1
}
table.put(put)
vals = cis.readLine()
counter += 1
if (counter % 10000 == 0) {
logger.info("Imported " + counter + " records")
}
}
cis.close()
}
/**
* import CSV to an HBase table
*
* @param tableName name of the table in HBase
* @param csvFile a file
*
* @throws IOException
*/
def importCSV(csvFile: File, keyColumn: String): Unit = {
init()
val fis: FileInputStream = new FileInputStream(csvFile)
val cis: CsvInputStream = new CsvInputStream(fis)
// read field names from the first line of the csv file
analyzeHeaders(cis.readLine(), keyColumn)
deleteTable()
createTable()
loadData(cis)
cis.close()
}
}