Импорт таблицы cassandra в spark через sparklyr - можно выбрать только несколько столбцов?
Я работал с sparklyr
чтобы зажечь большие столы кассандры, зарегистрируйте их в R и проведите dplyr
операции на них.
Я успешно импортировал таблицы Кассандры с кодом, который выглядит следующим образом:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
Некоторые из этих таблиц cassandra довольно большие ( > 8,5 млрд строк) и требуют времени для импорта / регистрации, а некоторые приводят к переполнению памяти, даже если 6 узлов работают с 60 ядрами и 192 ГБ ОЗУ. Тем не менее, мне обычно нужны только несколько столбцов из каждой базы данных кассандры.
Мои вопросы:
- Можно ли отфильтровать базу данных cassandra при импорте / регистрации, чтобы она импортировала только некоторые столбцы или чтобы она была отфильтрована по первичному ключу (т. Е. Путем передачи
SQL
/CQL
запросы типа, такие какSELECT name FROM cass_table WHERE id = 5
)? - Куда направляется такой запрос в приведенном выше коде и в какой форме принимает синтаксис?
Я попытался добавить такой запрос в качестве дополнительной опции в списке опций, а именно:
list(. . . , select = "id")
а также вызывать его как отдельную трубу перед %>% invoke("load")
т.е.
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
Но они не работают. Какие-либо предложения?
1 ответ
Вы можете пропустить активный кеш и выбрать интересующие колонки:
session <- spark_session(sc)
# Some columns to select
cols <- list("x", "y", "z")
cass_df <- session %>%
invoke("read") %>%
invoke("format", "org.apache.spark.sql.cassandra") %>%
invoke("options", as.environment(list(keyspace="test"))) %>%
invoke("load") %>%
# We use select(col: String, cols* String) so the first column
# has to be used separately. If you want only one column the third argument
# has to be an empty list
invoke("select", cols[[1]], cols[2:length(cols)]) %>%
# Standard lazy cache if you need one
invoke("cache")
Если вы используете предикаты, которые могут значительно уменьшить количество извлекаемых данных pushdown
возможность "true"
(по умолчанию) и использовать filter
до кеширования.
Если вы хотите пройти более сложный запрос, вы регистрируете временное представление и sql
метод:
session %>%
invoke("read") %>%
...
invoke("load") %>%
invoke("createOrReplaceTempView", "some_name")
cass_df <- session %>%
invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
invoke("cache")