Как получить доступ к вложенным полям в фрейме данных.proto, ScalaPB
Следующая моя схема данных
root
|-- name: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)
Я хочу вывести имя и город. Ниже приводится мое потоковое приложение, которое выводит имя и адреса, но я хочу указать имя и города в выходных данных. Ценю твою помощь. Благодарю.
object PersonConsumer {
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","person").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")
ds2.printSchema()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
2 ответа
Вы можете просто получить фрейм данных названия и города, а затем использовать его, для получения фрейма данных названия и города вы можете выбрать оба варианта следующим образом
ds1.select("name","addresses.element.city")
Спасибо Сандип. select("name","address.element.city") выдает мне ошибку, потому что адреса - это Seq[адрес], и я хочу, чтобы все города были выведены.
Наконец я написал следующую функцию, чтобы получить все города..
def getCities(addresses: Seq[Address]) : String = {
var cities:String = ""
if (addresses.size > 0) {
cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
// cities = addresses.foldLeft("")((str,addr) => str + addr.city.getOrElse(""))
}
cities
}