Создать RDD на основе части строк HBase

Я пытаюсь создать RDD на основе данных из HBase Таблица:

val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
  .map {
    case (key, row) => parse(key, row)
  }

parse вызывается для каждой строки таблицы без учета дальнейших манипуляций с данными.

Можно ли извлечь только строки с определенными ключами, которые соответствуют некоторому условию (т. Е. Ключи находятся в определенном диапазоне), чтобы работать только с ними?

1 ответ

Решение

HBase - это хранилище ключей / значений со строками, отсортированными по ключу, что означает, что:

  • эффективно извлекать отдельные строки по ключу или последовательность строк по ключевому диапазону
  • это не эффективно при получении случайных строк по некоторым условиям

Все операции извлечения сводятся к двум классам: Get и Scan. Нетрудно догадаться, что они делают, сканирование будет повторяться по всем строкам, если вы не укажете stopRow / startRow. Вы также можете установить фильтры на Scan, но он все равно должен повторять все строки, фильтры просто могут снизить нагрузку на сеть, потому что HBase, возможно, придется возвращать меньше строк.

TableInputFormat в вашем примере использует Scan внутри него для доступа к Hbase:

  public void setConf(Configuration configuration) {
    this.conf = configuration;

    Scan scan = null;

    if (conf.get(SCAN) != null) {
      try {
        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
      } catch (IOException e) {
        LOG.error("An error occurred.", e);
      }
    } else {
      try {
        scan = createScanFromConfiguration(conf);
      } catch (Exception e) {
          LOG.error(StringUtils.stringifyException(e));
      }
    }

    setScan(scan);
  }

Также метод createScanFromConfiguration внутри TableInputFormat может дать вам подсказку о том, как вы можете установить фильтры и диапазоны ключей:

  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
    Scan scan = new Scan();

    if (conf.get(SCAN_ROW_START) != null) {
      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
    }

    if (conf.get(SCAN_ROW_STOP) != null) {
      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
    }

    if (conf.get(SCAN_COLUMNS) != null) {
      addColumns(scan, conf.get(SCAN_COLUMNS));
    }

    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
    }

    if (conf.get(SCAN_TIMESTAMP) != null) {
      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
    }

    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
      scan.setTimeRange(
          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
    }

    if (conf.get(SCAN_MAXVERSIONS) != null) {
      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
    }

    if (conf.get(SCAN_CACHEDROWS) != null) {
      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
    }

    if (conf.get(SCAN_BATCHSIZE) != null) {
      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
    }

    // false by default, full table scans generate too much BC churn
    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));

    return scan;
  }

В этом ответе о стекопотоке приведен пример настройки сканирования на hbaseConfigОбратите внимание, что вам не нужно устанавливать сканирование, вы можете просто установить определенные свойства, такие как SCAN_ROW_START и другие из createScanFromConfiguration Я упоминал выше.

Другие вопросы по тегам