Разделение значений не является членом (String, String)

Я пытаюсь прочитать данные из Kafka и сохранения в таблицах Cassandra через Spark RDD.

Получение ошибки при компиляции кода:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

Код ниже: когда я запускаю код вручную через интерактив spark-shell работает нормально, но при компиляции кода spark-submit ошибка приходит.

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 

2 ответа

Решение

KafkaUtils.createDirectStream возвращает кортеж ключа и значения (так как сообщения в Kafka необязательно имеют ключ). В вашем случае это типа (String, String), Если вы хотите разделить значение, вы должны сначала извлечь его:

val lines = 
  messages
   .map(line => line._2.split(','))
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

Или используя частичный синтаксис функции:

val lines = 
  messages
   .map { case (_, value) => value.split(',') }
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))  

Все сообщения в кафке являются ключевыми. Оригинальный поток Кафки, в данном случае messages, это поток кортежей (key,value),

И, как указывает на ошибку компиляции, нет split метод на кортежи.

Что мы хотим сделать здесь:

messages.map{ case (key, value)  => value.split(','))} ...
Другие вопросы по тегам