Импорт таблицы cassandra в spark через sparklyr - можно выбрать только несколько столбцов?

Я работал с sparklyr чтобы зажечь большие столы кассандры, зарегистрируйте их в R и проведите dplyr операции на них.

Я успешно импортировал таблицы Кассандры с кодом, который выглядит следующим образом:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

Некоторые из этих таблиц cassandra довольно большие ( > 8,5 млрд строк) и требуют времени для импорта / регистрации, а некоторые приводят к переполнению памяти, даже если 6 узлов работают с 60 ядрами и 192 ГБ ОЗУ. Тем не менее, мне обычно нужны только несколько столбцов из каждой базы данных кассандры.

Мои вопросы:

  1. Можно ли отфильтровать базу данных cassandra при импорте / регистрации, чтобы она импортировала только некоторые столбцы или чтобы она была отфильтрована по первичному ключу (т. Е. Путем передачи SQL / CQL запросы типа, такие как SELECT name FROM cass_table WHERE id = 5)?
  2. Куда направляется такой запрос в приведенном выше коде и в какой форме принимает синтаксис?

Я попытался добавить такой запрос в качестве дополнительной опции в списке опций, а именно:

list(. . . , select = "id")

а также вызывать его как отдельную трубу перед %>% invoke("load")т.е.

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

Но они не работают. Какие-либо предложения?

1 ответ

Решение

Вы можете пропустить активный кеш и выбрать интересующие колонки:

session <- spark_session(sc)

# Some columns to select
cols <- list("x", "y", "z")

cass_df <- session %>% 
  invoke("read") %>% 
  invoke("format", "org.apache.spark.sql.cassandra") %>% 
  invoke("options", as.environment(list(keyspace="test"))) %>% 
  invoke("load") %>% 
  # We use select(col: String, cols* String) so the first column
  # has to be used separately. If you want only one column the third argument
  # has to be an empty list 
  invoke("select", cols[[1]], cols[2:length(cols)]) %>%
  # Standard lazy cache if you need one
  invoke("cache")

Если вы используете предикаты, которые могут значительно уменьшить количество извлекаемых данных pushdown возможность "true" (по умолчанию) и использовать filter до кеширования.

Если вы хотите пройти более сложный запрос, вы регистрируете временное представление и sql метод:

session %>%
  invoke("read") %>% 
  ...
  invoke("load") %>% 
  invoke("createOrReplaceTempView", "some_name")

cass_df <- session %>% 
  invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
  invoke("cache")
Другие вопросы по тегам