Искра с вводом / выводом Кассандры

Представьте себе следующее senario: Приложение Spark (реализация Java) использует базу данных Cassandra для загрузки, преобразования в RDD и обработки данных. Также приложение отправляет новые данные из базы данных, которые также обрабатываются пользовательским получателем. Выходные данные процесса потоковой передачи хранятся в базе данных. Реализация использует Spring Data Cassandra из интеграции с базой данных.

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

Метод DataProcessor.main:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

Ожидается, что будет иметь большой объем данных для начальной загрузки. По этой причине данные разбиваются на страницы, загружаются и распространяются в rddBuffer.

Также доступны следующие опции:

  1. Пример Spark-Cassandra ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), хотя объем документации минимален для этого примера.
  2. Проект Каллиопы ( http://tuplejump.github.io/calliope/)

Я хотел бы знать, что является лучшей практикой для интеграции Spark с Кассандрой. Каков наилучший вариант для подражания в моей реализации?

Apache Spark 1.0.0, Apache Cassandra 2.0.8

2 ответа

Решение

Самый простой способ работать с Cassandra и Spark - это использовать официальный драйвер Cassandra с открытым исходным кодом для Spark, разработанный DataStax: https://github.com/datastax/spark-cassandra-connector

Этот драйвер был построен поверх Cassandra Java Driver и обеспечивает прямой мост между Cassandra и Spark. В отличие от Calliope, он не использует интерфейс Hadoop. Кроме того, он предлагает следующие уникальные функции:

  • поддержка всех типов данных Cassandra, включая коллекции, из коробки
  • упрощенное сопоставление строк Cassandra с пользовательскими классами или кортежами без необходимости использования каких-либо последствий или других расширенных функций в Scala
  • сохранение любых RDD на Cassandra
  • полная поддержка виртуальных узлов Cassandra
  • возможность фильтрации / выбора на стороне сервера, например, используя кластерные столбцы Cassandra или вторичные индексы

Подход в приведенном выше коде - это классический централизованный алгоритм, который будет работать только в том случае, если выполняется в одном узле. И Cassandra, и Spark являются распределенными системами, и поэтому необходимо смоделировать процесс таким образом, чтобы он мог быть распределен по ряду узлов.

Существует несколько возможных подходов: если вы знаете ключи строк для выборки, вы можете сделать что-то простое, например: (используя драйвер DataStax Java Driver)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

Это будет эффективно распределять выборку ключей по кластеру Spark. Обратите внимание, как соединение с C* выполняется в замыкании, поскольку это гарантирует, что соединение будет установлено, когда задача выполняется на каждом отдельном распределенном работнике.

Учитывая, что в вашем примере используется подстановочный знак (т. Е. Ключи неизвестны), использование интерфейса Hadoop Cassandra является хорошим вариантом. Пример Spark-Cassandra, связанный в вопросе, иллюстрирует использование этого интерфейса Hadoop на Cassandra.

Calliope - это библиотека, которая включает в себя сложность использования интерфейса Hadoop, предоставляя простой API для доступа к этой функциональности. Он доступен только в Scala, так как он использует определенные функции Scala (например, имплициты и макросы в следующем выпуске). С Calliope вы в основном объявляете, как преобразовать ваш RDD[тип] в ключ строки и значение строки, а Calliope заботится о настройке hadoop взаимодействует с работой. Мы обнаружили, что Calliope (и соответствующие интерфейсы hadoop) работают в 2-4 раза быстрее, чем использование драйвера для взаимодействия с Cassandra.

Вывод: я бы отказался от конфигурации Spring-Data для доступа к Cassandra, так как это ограничит вас одним узлом. Подумайте о простом параллельном доступе, если это возможно, или изучите использование Calliope в Scala.

Другие вопросы по тегам