Объединить spark dStream с переменной в saveToCassandra()

У меня есть DStream[String, Int] с количеством пар слов, например ("hello" -> 10), Я хочу записать эти подсчеты Кассандре с индексом шага. Индекс инициализируется как var step = 1 и увеличивается с каждой обработанной микробатикой.

Таблица Кассандры создана как:

CREATE TABLE wordcounts (
    step int,
    word text,
    count int,
primary key (step, word)
);

При попытке записать поток в таблицу...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))

... Я получил java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step,

Как я могу подготовить step индекс для потока, чтобы написать три столбца вместе?

Я использую spark 2.0.0, scala 2.11.8, cassandra 3.4.0 и разъем spark-cassandra 2.0.0-M3.

3 ответа

Решение

Как уже отмечалось, в то время как таблица Кассандра ожидает что-то вроде (Int, String, Int), wordCount DStream имеет тип DStream[(String, Int)]так что для звонка saveToCassandra(...) чтобы работать, нам нужен DStream типа DStream[(Int, String, Int)],

Сложность в этом вопросе - как вывести локальный счетчик, который по определению известен только в драйвере, на уровень DStream.

Для этого нам нужно сделать две вещи: "поднять" счетчик на распределенный уровень (в Spark мы имеем в виду "RDD" или "DataFrame") и объединить это значение с существующим DStream данные.

Исходя из классического примера подсчета потоковых слов:

// Split each line into words
val words = lines.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

Мы добавляем локальную переменную для хранения количества микробатчей:

@transient var batchCount = 0

Он объявлен временным, поэтому Spark не пытается закрыть его значение, когда мы объявляем преобразования, которые его используют.

Теперь хитрый момент: в контексте DStream transformИз этого сингла мы делаем RDD variable и присоедините его к основному RDD DStream, используя декартово произведение:

val batchWordCounts = wordCounts.transform{ rdd => 
  batchCount = batchCount + 1

  val localCount = sparkContext.parallelize(Seq(batchCount))
  rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}

(Обратите внимание, что простой map Функция не будет работать, так как только начальное значение varIable будет захвачен и сериализован. Следовательно, при просмотре данных DStream счетчик никогда не увеличивается.

Наконец, теперь, когда данные в нужной форме, сохраните их в Cassandra:

batchWordCounts.saveToCassandra("keyspace", "wordcounts")

updateStateByKey Функция предоставляется спарк для обработки глобального состояния. Для этого случая это может выглядеть примерно так

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount: Int = runningCount.getOrElse(0) + 1
    Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)

stream.join(step).map{case (key,(count, step)) => (step,key,count)})
   .saveToCassandra("keyspace", "wordcounts")

Поскольку вы пытаетесь сохранить RDD в существующей таблице Cassandra, вам необходимо включить все значения столбцов первичного ключа в RDD.

Что вы можете сделать, вы можете использовать следующие методы, чтобы сохранить СДР в новую таблицу.

saveAsCassandraTable or saveAsCassandraTableEx

Для получения дополнительной информации посмотрите на это.

Другие вопросы по тегам