Как сохранить прямой поток Кафки JSON в Кассандру?
Я должен сохранить данные потоковой передачи в Cassandra. Поток идет от Kafka и сообщение Kafka в формате JSON, как показано ниже.
{
"status": "NOT_AVAILABLE",
"itemid": "550672332",
"qty": 0,
"lmts": "2017-11-18T10:39:21-08:00",
"timestamp": 1511030361000
}
Я написал код ниже, чтобы сделать это в Spark 2.2.0.
case class NliEvents(itemid: String, status: String, qty: String)
def main(args: Array[String]): Unit = {
.....
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val valueStream = stream.map(_.value())
val cassandraCrud = new CassandraOperations
import com.datastax.spark.connector._
val columns = SomeColumns("itemid", "status", "qty")
val keySpace = configuration.getString(env + ".cassandra.keyspace")
val gson = new Gson()
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
valueStream.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
val mapped = rdd.map(records => {
val json = parse(records)
val events = json.extract[NliEvents]
events
}
)
mapped.saveToCassandra(keySpace, "nli_events", columns)
}
})
}
Когда я запускаю этот код, я получаю
java.io.NotSerializableException: org.json4s.DefaultFormats$
ошибка. Может быть, я не делаю это правильно.
1 ответ
Можете ли вы заменить свой оператор foreach следующим кодом.
valueStream.mapPartitions(x => {
val lst = scala.collection.mutable.ListBuffer[NliEvents]()
while (x.hasNext) {
val json = parse(x.next())
val events = json.extract[NliEvents]
lst += events
}
lst.toList.iterator
}
).saveToCassandra(keySpace, "nli_events",columns)
Он должен работать. Дайте мне знать, если вы получите какие-либо ошибки.