Выполнить агрегацию в потоке данных

Я храню значения (временные ряды) в Bigtable и я наткнулся на случай, когда мне нужно применить filter на эти значения и выполнить агрегацию. Я использую следующую конфигурацию, чтобы получить connection в Bigtable (выполнить сканирование диапазона и т. д.):

Connection connection = BigtableConfiguration.connect(projectId, instanceId);
Table table = connection.getTable(TableName.valueOf(tableId)); 

table.getScanner(<a scanner with filter>);

Это помогает мне с ResultScanner и я могу перебирать строки. Однако я хочу выполнить агрегирование определенных столбцов и получить значения. SQL-эквивалент того, что я хочу сделать, будет следующим:

SELECT SUM(A), SUM(B)
FROM table
WHERE C = D;

Сделать то же самое в HBaseЯ наткнулся AggregationClient (Javadoc здесь), однако, это требует Configuration и мне нужно что-то, что убегает Bigtable (так что мне не нужно использовать API Hbase низкого уровня).

Я проверил документацию и не смог найти ничего (на Java), которое могло бы это сделать. Может кто-нибудь поделиться примером для выполнения aggregation с (не ключ строки или любой) фильтры на BigTable.

2 ответа

У Bigtable изначально нет механизмов агрегации. Кроме того, Bigtable имеет трудности с обработкой WHERE C = DТаким образом, этот тип обработки обычно лучше выполнять на стороне клиента.

AggregationClient является сопроцессором HBase Cloud Bigtable не поддерживает сопроцессоры.

Если вы хотите использовать Cloud Bigtable для такого типа агрегации, вам придется использовать table.scan() и ваша собственная логика. Если масштаб достаточно велик, вам придется использовать Dataflow или BigQuery для выполнения агрегации.

Вот один из способов:

PCollection<TableRow> rows = p.apply(BigQueryIO.readTableRows()
  .fromQuery("SELECT A, B FROM table;"));

PCollection<KV<String, Integer>> valuesA =
  rows.apply(
    MapElements.into(TypeDescriptors.kvs(
      TypeDescriptors.strings(),
      TypeDescriptors.integers()))
      .via((TableRow row) -> KV.of(
        "A", (Integer) row.getF().get(0).getV())));

PCollection<KV<String, Integer>> valuesB =
  rows.apply(
    MapElements.into(TypeDescriptors.kvs(
      TypeDescriptors.strings(),
      TypeDescriptors.integers()))
      .via((TableRow row) -> KV.of(
        "B", (Integer) row.getF().get(1).getV())));

PCollection<KV<String, Integer>> sums =
  PCollectionList.of(sumOfA).and(sumOfB)
    .apply(Flatten.pCollections())
    .apply(Sum.integersPerKey());