Как объединить (объединить) информацию в массиве [DataFrame]
У меня есть массив [DataFrame], и я хочу проверить, есть ли какие-либо изменения в значениях по столбцам для каждой строки каждого фрейма данных. Скажем, у меня есть первая строка из трех фреймов данных, например:
(0,1.0,0.4,0.1)
(0,3.0,0.2,0.1)
(0,5.0,0.4,0.1)
Первый столбец - это ID, и мой идеальный вывод для этого ID:
(0, 1, 1, 0)
Это означает, что второй и третий столбцы изменились, а третий - нет. Я прилагаю здесь немного данных, чтобы повторить мои настройки
val rdd = sc.parallelize(Array((0,1.0,0.4,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.9,0.2),
(3,0.9,0.2,0.2),
(4,0.3,0.5,0.5)))
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.5,0.2),
(3,0.8,0.1,0.1),
(4,0.3,0.5,0.5)))
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1),
(1,0.5,0.3,0.3),
(2,0.3,0.3,0.5),
(3,0.3,0.3,0.1),
(4,0.3,0.5,0.5)))
val df = rdd.toDF("id", "prop1", "prop2", "prop3")
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3")
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3")
val result:Array[DataFrame] = new Array[DataFrame](3)
result.update(0, df)
result.update(1,df2)
result.update(2,df3)
Как я могу отобразить массив и получить вывод?
2 ответа
Ты можешь использовать countDistinct
с groupBy
:
import org.apache.spark.sql.functions.{countDistinct}
val exprs = Seq("prop1", "prop2", "prop3")
.map(c => (countDistinct(c) > 1).cast("integer").alias(c))
val combined = result.reduce(_ unionAll _)
val aggregatedViaGroupBy = combined
.groupBy($"id")
.agg(exprs.head, exprs.tail: _*)
aggregatedViaGroupBy.show
// +---+-----+-----+-----+
// | id|prop1|prop2|prop3|
// +---+-----+-----+-----+
// | 0| 1| 1| 0|
// | 1| 1| 0| 0|
// | 2| 1| 1| 1|
// | 3| 1| 1| 1|
// | 4| 0| 0| 0|
// +---+-----+-----+-----+
Сначала нам нужно присоединиться ко всем DataFrames
все вместе.
val combined = result.reduceLeft((a,b) => a.join(b,"id"))
Чтобы сравнить все столбцы одной и той же метки (например, "prod1"), я обнаружил, что легче (по крайней мере, для меня) работать на уровне СДР. Мы сначала преобразовываем данные в (id, Seq[Double])
,
val finalResults = combined.rdd.map{
x =>
(x.getInt(0), x.toSeq.tail.map(_.asInstanceOf[Double]))
}.map{
case(i,d) =>
def checkAllEqual(l: Seq[Double]) = if(l.toSet.size == 1) 0 else 1
val g = d.grouped(3).toList
val g1 = checkAllEqual(g.map(x => x(0)))
val g2 = checkAllEqual(g.map(x => x(1)))
val g3 = checkAllEqual(g.map(x => x(2)))
(i, g1,g2,g3)
}.toDF("id", "prod1", "prod2", "prod3")
finalResults.show()
Это напечатает:
+---+-----+-----+-----+
| id|prod1|prod2|prod3|
+---+-----+-----+-----+
| 0| 1| 1| 0|
| 1| 1| 0| 0|
| 2| 1| 1| 1|
| 3| 1| 1| 1|
| 4| 0| 0| 0|
+---+-----+-----+-----+