Искра с вводом / выводом Кассандры
Представьте себе следующее senario: Приложение Spark (реализация Java) использует базу данных Cassandra для загрузки, преобразования в RDD и обработки данных. Также приложение отправляет новые данные из базы данных, которые также обрабатываются пользовательским получателем. Выходные данные процесса потоковой передачи хранятся в базе данных. Реализация использует Spring Data Cassandra из интеграции с базой данных.
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
Метод DataProcessor.main:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
Ожидается, что будет иметь большой объем данных для начальной загрузки. По этой причине данные разбиваются на страницы, загружаются и распространяются в rddBuffer.
Также доступны следующие опции:
- Пример Spark-Cassandra ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), хотя объем документации минимален для этого примера.
- Проект Каллиопы ( http://tuplejump.github.io/calliope/)
Я хотел бы знать, что является лучшей практикой для интеграции Spark с Кассандрой. Каков наилучший вариант для подражания в моей реализации?
Apache Spark 1.0.0, Apache Cassandra 2.0.8
2 ответа
Самый простой способ работать с Cassandra и Spark - это использовать официальный драйвер Cassandra с открытым исходным кодом для Spark, разработанный DataStax: https://github.com/datastax/spark-cassandra-connector
Этот драйвер был построен поверх Cassandra Java Driver и обеспечивает прямой мост между Cassandra и Spark. В отличие от Calliope, он не использует интерфейс Hadoop. Кроме того, он предлагает следующие уникальные функции:
- поддержка всех типов данных Cassandra, включая коллекции, из коробки
- упрощенное сопоставление строк Cassandra с пользовательскими классами или кортежами без необходимости использования каких-либо последствий или других расширенных функций в Scala
- сохранение любых RDD на Cassandra
- полная поддержка виртуальных узлов Cassandra
- возможность фильтрации / выбора на стороне сервера, например, используя кластерные столбцы Cassandra или вторичные индексы
Подход в приведенном выше коде - это классический централизованный алгоритм, который будет работать только в том случае, если выполняется в одном узле. И Cassandra, и Spark являются распределенными системами, и поэтому необходимо смоделировать процесс таким образом, чтобы он мог быть распределен по ряду узлов.
Существует несколько возможных подходов: если вы знаете ключи строк для выборки, вы можете сделать что-то простое, например: (используя драйвер DataStax Java Driver)
val data = sparkContext.parallelize(keys).map{key =>
val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
val session = cluster.connect(keyspace)
val statement = session.prepare("...cql...);")
val boundStatement = new BoundStatement(sttmt)
session.execute(session.execute(boundStatement.bind(...data...)
}
Это будет эффективно распределять выборку ключей по кластеру Spark. Обратите внимание, как соединение с C* выполняется в замыкании, поскольку это гарантирует, что соединение будет установлено, когда задача выполняется на каждом отдельном распределенном работнике.
Учитывая, что в вашем примере используется подстановочный знак (т. Е. Ключи неизвестны), использование интерфейса Hadoop Cassandra является хорошим вариантом. Пример Spark-Cassandra, связанный в вопросе, иллюстрирует использование этого интерфейса Hadoop на Cassandra.
Calliope - это библиотека, которая включает в себя сложность использования интерфейса Hadoop, предоставляя простой API для доступа к этой функциональности. Он доступен только в Scala, так как он использует определенные функции Scala (например, имплициты и макросы в следующем выпуске). С Calliope вы в основном объявляете, как преобразовать ваш RDD[тип] в ключ строки и значение строки, а Calliope заботится о настройке hadoop взаимодействует с работой. Мы обнаружили, что Calliope (и соответствующие интерфейсы hadoop) работают в 2-4 раза быстрее, чем использование драйвера для взаимодействия с Cassandra.
Вывод: я бы отказался от конфигурации Spring-Data для доступа к Cassandra, так как это ограничит вас одним узлом. Подумайте о простом параллельном доступе, если это возможно, или изучите использование Calliope в Scala.