Проблема при полном сканировании таблицы в Кассандре

Во-первых: я знаю, что не стоит делать полное сканирование на Кассандре, но сейчас мне нужно именно это.

Когда я начал искать что-то подобное, я читал людей, которые говорили, что невозможно провести полное сканирование на Кассандре, и он не был создан, чтобы делать подобные вещи.

Не удовлетворен, продолжаю искать, пока не найду эту статью: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

Выглядит довольно разумно, и я попробовал. Поскольку я буду выполнять полное сканирование только один раз, а время и производительность не будут проблемой, я написал запрос и поместил его в простое задание, чтобы найти все нужные мне записи. Из двух миллиардов строк записей мой результат был примерно 1000, но у меня было только 100 записей.

Моя работа:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

По сути, я делаю поиск разрешенного минимального токена и пошагово иду до последнего.

Я не знаю, но похоже, что работа не выполнила полное сканирование полностью, или мой запрос получил доступ только к одному узлу или чему-то другому. Я не знаю, делаю ли я что-то не так или не могу сделать полное сканирование.

Сегодня у меня есть почти 2 ТБ данных, только одна таблица в одном кластере из семи узлов.

Кто-то уже был в этой ситуации или есть рекомендации?

5 ответов

Определенно возможно выполнить полное сканирование таблицы в Cassandra - действительно, это довольно распространено для таких вещей, как Spark. Тем не менее, это обычно не "быстро", поэтому не рекомендуется, если вы не знаете, почему вы это делаете. Для ваших актуальных вопросов:

1) Если вы используете CQL, вы почти наверняка используете разделитель Murmur3, поэтому ваш минимальный токен -9223372036854775808 (и максимальный токен 9223372036854775808).

2) Вы используете session.execute(), который будет использовать согласованность по умолчанию ONE, которая может не возвращать все результаты в вашем кластере, особенно если вы также пишете в ONE, что, я подозреваю, может быть. Увеличьте это значение до ALL и используйте подготовленные операторы для ускорения синтаксического анализа CQL:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }

Я очень рекомендую использовать Spark - даже в отдельном приложении (т.е. без кластера). Он позаботится о разбиении разделов и обработке их один за другим. Слишком прост в использовании:

https://github.com/datastax/spark-cassandra-connector

Это очень старый вопрос, но я отвечаю на него, потому что столкнулся с той же проблемой - не получить все строки и нашел причину.

Эта проблема возникает при наличии нескольких строк для одного ключа раздела.

В приведенной выше реализации, когда строка в середине раздела возвращается из-за ограничения LIMIT, остальные строки в этом разделе будут потеряны.

Это связано с тем, что в следующем запросе оператор where начнет чтение со значения следующих разделов.

Например, предположим, что у нас есть такая таблица, как следующая

      partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1
              1|     3|                     1
              2|     4|                     2
              2|     5|                     2
              2|     6|                     2
              3|     7|                     3
              4|     8|                     4

Если мы запустим приведенный выше пример кода с LIMIT 2 в этой таблице, мы получим ...

1-я итерация

      SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 0 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1

2-я итерация

      SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 1 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              2|     4|                     2
              2|     5|                     2

3-я итерация

      SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 2 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              3|     7|                     3
              4|     8|                     4

В результате мы не можем получить idx 3 и 6.

Это распространенная ошибка реализации запроса на подкачку.

Если вам регулярно нужно выполнять полное сканирование таблицы Cassandra, скажем, для аналитики в Spark, тогда я настоятельно рекомендую вам рассмотреть возможность хранения данных с использованием модели данных, оптимизированной для чтения. Вы можете проверить http://github.com/tuplejump/FiloDB пример оптимизированной для чтения установки на Cassandra.

Это для общего, что вам нужно сделать? Или сценарий одного случая? Я согласен, что это не рекомендуется делать регулярно, но у меня также была проблема, когда мне приходилось читать все строки из ColumnFamily, и я полагался на рецепт AllRowsReader от клиента Astyanax. Я вижу, что вы используете драйвер Datastax CQL для подключения к вашему кластеру, но если то, что вы ищете, является чем-то, что доказало свою работоспособность, то вам может быть неважно решать проблемы с использованием библиотеки Astyanax.

В моем случае я читал все ключи строк, а затем у меня была другая работа, чтобы взаимодействовать с ColumnFamily с ключами, которые я собрал.

import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;

import java.util.concurrent.CopyOnWriteArrayList;

...        

private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;

public List<String> getAllKeys() throws Exception {

    final List<String> rowKeys = new CopyOnWriteArrayList<>();

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
        .withPartitioner(null) // this will use keyspace's partitioner
        .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
        if (row == null) {
            return true;
        }

        String key = row.getKey();

        rowKeys.add(key);

        return true;
    }).build().call();

    return rowKeys;
}

Существуют различные варианты конфигурации для запуска этого в нескольких потоках и многих других вещах, как я уже сказал, что я только один раз запустил это в своем коде и работал действительно хорошо, я был бы рад помочь, если вы столкнетесь с проблемами, пытающимися заставить его работать,

Надеюсь это поможет,

Хосе Луис

Другие вопросы по тегам