Hbase Table.batch занимает 300 секунд, чтобы вставить 800000 записей в таблицу

Я читаю файл json размером 30 Мб, процесс создания семейства столбцов и значения ключей. Затем создайте объект Put, вставьте в него ключ строки и значения. Создайте список таких помещаемых объектов и вызовите Table.batch() и передайте этот список. Я звоню, когда мой размер массива 50000. Затем очистите список и вызовите следующую партию. Однако обработка файла while, который в итоге содержит 800 000 записей, занимает 300 секунд. Я тоже устал от table.put, но это было еще медленнее. Я использую hbase 1.1. Я получаю этот JSON от Кафки. Любые предложения по улучшению производительности приветствуются. Я проверил ТАК форумы, но не сильно помог. Я поделюсь кодом, если вы хотите посмотреть на него.

С уважением

Raghavendra

public static void processData(String jsonData)
{
    if (jsonData == null || jsonData.isEmpty())
    {
        System.out.println("JSON data is null or empty. Nothing to process");
        return;
    }

    long startTime = System.currentTimeMillis();

    Table table = null;
    try
    {
        table = HBaseConfigUtil.getInstance().getConnection().getTable(TableName.valueOf("MYTABLE"));
    }
    catch (IOException e1)
    {
        System.out.println(e1);
    }

    Put processData = null;
    List<Put> bulkData = new ArrayList<Put>();

    try
    {

        //Read the json and generate the model into a class    
        //ProcessExecutions is List<ProcessExecution>
        ProcessExecutions peData = JsonToColumnData.gson.fromJson(jsonData, ProcessExecutions.class);

        if (peData != null)
        {
            //Read the data and pass it to Hbase
            for (ProcessExecution pe : peData.processExecutions)
            {
                //Class Header stores some header information
                Header headerData = pe.getHeader();   

                String rowKey = headerData.getRowKey();
                processData = new Put(Bytes.toBytes(JsonToColumnData.rowKey));
                processData.addColumn(Bytes.toBytes("Data"),
                                Bytes.toBytes("Time"),
                                Bytes.toBytes("value"));

                //Add to list
                bulkData.add(processData);            
                if (bulkData.size() >= 50000) //hardcoded for demo
                {
                    long tmpTime = System.currentTimeMillis();
                    Object[] results = null;
                    table.batch(bulkData, results);                     
                    bulkData.clear();
                    System.gc();
                }
            } //end for
            //Complete the remaining write operation
            if (bulkData.size() > 0)
            {
                Object[] results = null;
                table.batch(bulkData, results);
                bulkData.clear();
                //Try to free memory
                System.gc();
            }
    }
    catch (Exception e)
    {
        System.out.println(e);
        e.printStackTrace();
    }
    finally
    {
        try
        {
            table.close();
        }
        catch (IOException e)
        {
            System.out.println("Error closing table " + e);
            e.printStackTrace();
        }
    }

}


//This function is added here to show the connection
 /*public Connection getConnection()
{

    try
    {
        if (this.connection == null)
        {
            ExecutorService executor = Executors.newFixedThreadPool(HBaseConfigUtil.THREADCOUNT);
            this.connection = ConnectionFactory.createConnection(this.getHBaseConfiguration(), executor);
        }
    }
    catch (IOException e)
    {
        e.printStackTrace();
        System.out.println("Error in getting connection " + e.getMessage());
    }

    return this.connection;
}*/

1 ответ

Решение

У меня был тот же случай, когда мне нужно проанализировать 5 ГБ json и вставить в таблицу hbase... Вы можете попробовать следующий способ (который должен работать), который оказался очень быстрым для пакета из 100000 записей в моем случае.

public void addMultipleRecordsAtaShot(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }

Для получения дополнительной информации об увеличении размера буфера проверьте мой ответ в другом контексте, чтобы увеличить размер буфера, см. Документ https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Table.html

Другие вопросы по тегам