Объединить spark dStream с переменной в saveToCassandra()
У меня есть DStream[String, Int
] с количеством пар слов, например ("hello" -> 10)
, Я хочу записать эти подсчеты Кассандре с индексом шага. Индекс инициализируется как var step = 1
и увеличивается с каждой обработанной микробатикой.
Таблица Кассандры создана как:
CREATE TABLE wordcounts (
step int,
word text,
count int,
primary key (step, word)
);
При попытке записать поток в таблицу...
stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))
... Я получил java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
,
Как я могу подготовить step
индекс для потока, чтобы написать три столбца вместе?
Я использую spark 2.0.0, scala 2.11.8, cassandra 3.4.0 и разъем spark-cassandra 2.0.0-M3.
3 ответа
Как уже отмечалось, в то время как таблица Кассандра ожидает что-то вроде (Int, String, Int)
, wordCount DStream имеет тип DStream[(String, Int)]
так что для звонка saveToCassandra(...)
чтобы работать, нам нужен DStream
типа DStream[(Int, String, Int)]
,
Сложность в этом вопросе - как вывести локальный счетчик, который по определению известен только в драйвере, на уровень DStream.
Для этого нам нужно сделать две вещи: "поднять" счетчик на распределенный уровень (в Spark мы имеем в виду "RDD" или "DataFrame") и объединить это значение с существующим DStream
данные.
Исходя из классического примера подсчета потоковых слов:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Мы добавляем локальную переменную для хранения количества микробатчей:
@transient var batchCount = 0
Он объявлен временным, поэтому Spark не пытается закрыть его значение, когда мы объявляем преобразования, которые его используют.
Теперь хитрый момент: в контексте DStream transform
Из этого сингла мы делаем RDD var
iable и присоедините его к основному RDD DStream, используя декартово произведение:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(Обратите внимание, что простой map
Функция не будет работать, так как только начальное значение var
Iable будет захвачен и сериализован. Следовательно, при просмотре данных DStream счетчик никогда не увеличивается.
Наконец, теперь, когда данные в нужной форме, сохраните их в Cassandra:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
updateStateByKey
Функция предоставляется спарк для обработки глобального состояния. Для этого случая это может выглядеть примерно так
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount: Int = runningCount.getOrElse(0) + 1
Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)
stream.join(step).map{case (key,(count, step)) => (step,key,count)})
.saveToCassandra("keyspace", "wordcounts")
Поскольку вы пытаетесь сохранить RDD в существующей таблице Cassandra, вам необходимо включить все значения столбцов первичного ключа в RDD.
Что вы можете сделать, вы можете использовать следующие методы, чтобы сохранить СДР в новую таблицу.
saveAsCassandraTable or saveAsCassandraTableEx
Для получения дополнительной информации посмотрите на это.