Как получить доступ к вложенным полям в фрейме данных.proto, ScalaPB

Следующая моя схема данных

root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- city: string (nullable = true)

Я хочу вывести имя и город. Ниже приводится мое потоковое приложение, которое выводит имя и адреса, но я хочу указать имя и города в выходных данных. Ценю твою помощь. Благодарю.

object PersonConsumer {
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","person").load()

    val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")

    ds2.printSchema()

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

2 ответа

Вы можете просто получить фрейм данных названия и города, а затем использовать его, для получения фрейма данных названия и города вы можете выбрать оба варианта следующим образом

ds1.select("name","addresses.element.city")

Спасибо Сандип. select("name","address.element.city") выдает мне ошибку, потому что адреса - это Seq[адрес], и я хочу, чтобы все города были выведены.

Наконец я написал следующую функцию, чтобы получить все города..

    def getCities(addresses: Seq[Address]) : String = {
      var cities:String = ""
      if (addresses.size > 0) {
        cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//        cities = addresses.foldLeft("")((str,addr) => str  + addr.city.getOrElse(""))
      }
      cities
    }
Другие вопросы по тегам