XML-источник для Spark и групповой работы

Я использую XML источник из databricks, Вот мой XML Пример данных.

<ds Name="abc">
   <node begin="18" end="22" val="Organic" type="type1">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Cereal">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="23" end="25" val="Kellogs" type="type2">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Harry" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
   <node begin="23" end="25" val="Potter" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
</ds>

Я хочу объединить все node.val (в том же порядке, как они появляются в XML файл] сгруппированы по hs id).

Например, o/p для приведенных выше данных должно быть:

Имя hs id Val

abc 0 Органические хлопья

ABC 1 Гарри Поттер

Вот где я загружаю исходный код XML из блоков данных:

val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "ds")
.option("attributePrefix", "")
.load(args(0))

df.registerTempTable("ds")

Я не уверен, как сгруппировать набор данных по hs id и убедитесь, что заказ сохраняется.

val  df_ds = sqlContext.sql("SELECT Name, node.type from ds")

1 ответ

Решение

Пытаться:

import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

val comb = udf((rows: Seq[Row]) => {
  val result = LinkedHashMap[Long, Array[String]]()
  for (row <- rows) {
     val id = row.getAs[Row]("hs").getAs[Long]("id")
     result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val")
  }
  result.values.toArray.map(x => x.mkString(" "))
})

df.printSchema
root
 |-- Name: string (nullable = true)
 |-- node: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- begin: long (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- hs: struct (nullable = true)
 |    |    |    |-- #VALUE: string (nullable = true)
 |    |    |    |-- begin: long (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- val: string (nullable = true)

df.withColumn("comb", comb(df("node")))
Другие вопросы по тегам