Проблема при полном сканировании таблицы в Кассандре
Во-первых: я знаю, что не стоит делать полное сканирование на Кассандре, но сейчас мне нужно именно это.
Когда я начал искать что-то подобное, я читал людей, которые говорили, что невозможно провести полное сканирование на Кассандре, и он не был создан, чтобы делать подобные вещи.
Не удовлетворен, продолжаю искать, пока не найду эту статью: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/
Выглядит довольно разумно, и я попробовал. Поскольку я буду выполнять полное сканирование только один раз, а время и производительность не будут проблемой, я написал запрос и поместил его в простое задание, чтобы найти все нужные мне записи. Из двух миллиардов строк записей мой результат был примерно 1000, но у меня было только 100 записей.
Моя работа:
public void run() {
Cluster cluster = getConnection();
Session session = cluster.connect("db");
LOGGER.info("Starting ...");
boolean run = true;
int print = 0;
while ( run ) {
if (maxTokenReached(actualToken)) {
LOGGER.info("Max Token Reached!");
break;
}
ResultSet resultSet = session.execute(queryBuilder(actualToken));
Iterator<Row> rows = resultSet.iterator();
if ( !rows.hasNext()){
break;
}
List<String> rowIds = new ArrayList<String>();
while (rows.hasNext()) {
Row row = rows.next();
Long leadTime = row.getLong("my_column");
if (myCondition(myCollumn)) {
String rowId = row.getString("key");
rowIds.add(rowId);
}
if (!rows.hasNext()) {
Long token = row.getLong("token(rowid)");
if (!rowIds.isEmpty()) {
LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
}
actualToken = nextToken(token);
}
}
}
LOGGER.info("Done!");
cluster.shutdown();
}
public boolean maxTokenReached(Long actualToken){
return actualToken >= maxToken;
}
public String queryBuilder(Long nextRange) {
return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}
public Long nextToken(Long token){
return token + 1;
}
По сути, я делаю поиск разрешенного минимального токена и пошагово иду до последнего.
Я не знаю, но похоже, что работа не выполнила полное сканирование полностью, или мой запрос получил доступ только к одному узлу или чему-то другому. Я не знаю, делаю ли я что-то не так или не могу сделать полное сканирование.
Сегодня у меня есть почти 2 ТБ данных, только одна таблица в одном кластере из семи узлов.
Кто-то уже был в этой ситуации или есть рекомендации?
5 ответов
Определенно возможно выполнить полное сканирование таблицы в Cassandra - действительно, это довольно распространено для таких вещей, как Spark. Тем не менее, это обычно не "быстро", поэтому не рекомендуется, если вы не знаете, почему вы это делаете. Для ваших актуальных вопросов:
1) Если вы используете CQL, вы почти наверняка используете разделитель Murmur3, поэтому ваш минимальный токен -9223372036854775808 (и максимальный токен 9223372036854775808).
2) Вы используете session.execute(), который будет использовать согласованность по умолчанию ONE, которая может не возвращать все результаты в вашем кластере, особенно если вы также пишете в ONE, что, я подозреваю, может быть. Увеличьте это значение до ALL и используйте подготовленные операторы для ускорения синтаксического анализа CQL:
public void run() {
Cluster cluster = getConnection();
Session session = cluster.connect("db");
LOGGER.info("Starting ...");
actualToken = -9223372036854775808;
boolean run = true;
int print = 0;
while ( run ) {
if (maxTokenReached(actualToken)) {
LOGGER.info("Max Token Reached!");
break;
}
SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
stmt.setConsistencyLevel(ConsistencyLevel.ALL);
ResultSet resultSet = session.execute(stmt);
Iterator<Row> rows = resultSet.iterator();
if ( !rows.hasNext()){
break;
}
List<String> rowIds = new ArrayList<String>();
while (rows.hasNext()) {
Row row = rows.next();
Long leadTime = row.getLong("my_column");
if (myCondition(myCollumn)) {
String rowId = row.getString("key");
rowIds.add(rowId);
}
if (!rows.hasNext()) {
Long token = row.getLong("token(rowid)");
if (!rowIds.isEmpty()) {
LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
}
actualToken = nextToken(token);
}
}
}
LOGGER.info("Done!");
cluster.shutdown();
}
public boolean maxTokenReached(Long actualToken){
return actualToken >= maxToken;
}
public String queryBuilder(Long nextRange) {
return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}
public Long nextToken(Long token) {
return token + 1;
}
Я очень рекомендую использовать Spark - даже в отдельном приложении (т.е. без кластера). Он позаботится о разбиении разделов и обработке их один за другим. Слишком прост в использовании:
Это очень старый вопрос, но я отвечаю на него, потому что столкнулся с той же проблемой - не получить все строки и нашел причину.
Эта проблема возникает при наличии нескольких строк для одного ключа раздела.
В приведенной выше реализации, когда строка в середине раздела возвращается из-за ограничения LIMIT, остальные строки в этом разделе будут потеряны.
Это связано с тем, что в следующем запросе оператор where начнет чтение со значения следующих разделов.
Например, предположим, что у нас есть такая таблица, как следующая
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
1| 1| 1
1| 2| 1
1| 3| 1
2| 4| 2
2| 5| 2
2| 6| 2
3| 7| 3
4| 8| 4
Если мы запустим приведенный выше пример кода с LIMIT 2 в этой таблице, мы получим ...
1-я итерация
SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 0 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
1| 1| 1
1| 2| 1
2-я итерация
SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 1 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
2| 4| 2
2| 5| 2
3-я итерация
SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 2 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
3| 7| 3
4| 8| 4
В результате мы не можем получить idx 3 и 6.
Это распространенная ошибка реализации запроса на подкачку.
Если вам регулярно нужно выполнять полное сканирование таблицы Cassandra, скажем, для аналитики в Spark, тогда я настоятельно рекомендую вам рассмотреть возможность хранения данных с использованием модели данных, оптимизированной для чтения. Вы можете проверить http://github.com/tuplejump/FiloDB пример оптимизированной для чтения установки на Cassandra.
Это для общего, что вам нужно сделать? Или сценарий одного случая? Я согласен, что это не рекомендуется делать регулярно, но у меня также была проблема, когда мне приходилось читать все строки из ColumnFamily, и я полагался на рецепт AllRowsReader от клиента Astyanax. Я вижу, что вы используете драйвер Datastax CQL для подключения к вашему кластеру, но если то, что вы ищете, является чем-то, что доказало свою работоспособность, то вам может быть неважно решать проблемы с использованием библиотеки Astyanax.
В моем случае я читал все ключи строк, а затем у меня была другая работа, чтобы взаимодействовать с ColumnFamily с ключами, которые я собрал.
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;
import java.util.concurrent.CopyOnWriteArrayList;
...
private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;
public List<String> getAllKeys() throws Exception {
final List<String> rowKeys = new CopyOnWriteArrayList<>();
new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
.withPartitioner(null) // this will use keyspace's partitioner
.withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
if (row == null) {
return true;
}
String key = row.getKey();
rowKeys.add(key);
return true;
}).build().call();
return rowKeys;
}
Существуют различные варианты конфигурации для запуска этого в нескольких потоках и многих других вещах, как я уже сказал, что я только один раз запустил это в своем коде и работал действительно хорошо, я был бы рад помочь, если вы столкнетесь с проблемами, пытающимися заставить его работать,
Надеюсь это поможет,
Хосе Луис