Как объединить (объединить) информацию в массиве [DataFrame]

У меня есть массив [DataFrame], и ​​я хочу проверить, есть ли какие-либо изменения в значениях по столбцам для каждой строки каждого фрейма данных. Скажем, у меня есть первая строка из трех фреймов данных, например:

 (0,1.0,0.4,0.1)
 (0,3.0,0.2,0.1)
 (0,5.0,0.4,0.1)

Первый столбец - это ID, и мой идеальный вывод для этого ID:

 (0, 1, 1, 0)

Это означает, что второй и третий столбцы изменились, а третий - нет. Я прилагаю здесь немного данных, чтобы повторить мои настройки

val rdd = sc.parallelize(Array((0,1.0,0.4,0.1),
                               (1,0.9,0.3,0.3),
                               (2,0.2,0.9,0.2),
                               (3,0.9,0.2,0.2),
                               (4,0.3,0.5,0.5)))
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1),
                                (1,0.9,0.3,0.3),
                                (2,0.2,0.5,0.2),
                                (3,0.8,0.1,0.1),
                                (4,0.3,0.5,0.5)))
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1),
                                (1,0.5,0.3,0.3),
                                (2,0.3,0.3,0.5),
                                (3,0.3,0.3,0.1),
                                (4,0.3,0.5,0.5)))
val df = rdd.toDF("id", "prop1", "prop2", "prop3")
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3")
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3")
val result:Array[DataFrame] = new Array[DataFrame](3)
result.update(0, df)
result.update(1,df2)
result.update(2,df3)

Как я могу отобразить массив и получить вывод?

2 ответа

Решение

Ты можешь использовать countDistinct с groupBy:

import org.apache.spark.sql.functions.{countDistinct}

val exprs = Seq("prop1", "prop2", "prop3")
  .map(c => (countDistinct(c) > 1).cast("integer").alias(c))

val combined = result.reduce(_ unionAll _)

val aggregatedViaGroupBy = combined
  .groupBy($"id")
  .agg(exprs.head, exprs.tail: _*)

aggregatedViaGroupBy.show
// +---+-----+-----+-----+
// | id|prop1|prop2|prop3|
// +---+-----+-----+-----+
// |  0|    1|    1|    0|
// |  1|    1|    0|    0|
// |  2|    1|    1|    1|
// |  3|    1|    1|    1|
// |  4|    0|    0|    0|
// +---+-----+-----+-----+

Сначала нам нужно присоединиться ко всем DataFrames все вместе.

val combined = result.reduceLeft((a,b) => a.join(b,"id"))

Чтобы сравнить все столбцы одной и той же метки (например, "prod1"), я обнаружил, что легче (по крайней мере, для меня) работать на уровне СДР. Мы сначала преобразовываем данные в (id, Seq[Double]),

val finalResults = combined.rdd.map{
  x => 
    (x.getInt(0), x.toSeq.tail.map(_.asInstanceOf[Double]))
}.map{ 
  case(i,d) => 
     def checkAllEqual(l: Seq[Double]) = if(l.toSet.size == 1) 0 else 1
     val g = d.grouped(3).toList 
     val g1 = checkAllEqual(g.map(x => x(0)))
     val g2 = checkAllEqual(g.map(x => x(1)))
     val g3 = checkAllEqual(g.map(x => x(2)))
     (i, g1,g2,g3)
}.toDF("id", "prod1", "prod2", "prod3")

finalResults.show()

Это напечатает:

+---+-----+-----+-----+
| id|prod1|prod2|prod3|
+---+-----+-----+-----+
|  0|    1|    1|    0|
|  1|    1|    0|    0|
|  2|    1|    1|    1|
|  3|    1|    1|    1|
|  4|    0|    0|    0|
+---+-----+-----+-----+
Другие вопросы по тегам