Spark Scala: преобразование СДР ряда в СДР корзины
Я пытаюсь запустить FPGrowth, но на самом деле я наткнулся на проблему с типами ввода. Учитывая код:
%scala
// association rule learning for OFFLINE with FPGrowth from MLLib
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.mllib.fpm.PrefixSpan
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.FlatMapFunction
import org.apache.spark.mllib.linalg.Vectors
val dfoffline = spark.table("offlinetrx")
val products = dfoffline
.groupBy("Beleg")
.agg(
collect_set("Produkt") as "items")
// debugging
val columnProducts = products.select("items")
columnProducts.printSchema()
columnProducts.show()
это дает следующий вывод:
root
|-- items: array (nullable = true)
| |-- element: string (containsNull = true)
+--------------------+
| items|
+--------------------+
|[19420.01, 46872.01]|
|[AEC003.01, AEC00...|
| [BT102.01, BET103]|
Код продолжается с преобразования в RDD и выполнения FPGrowth
val rdd = columnProducts.rdd
val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(6)
val model = fpg.run(rdd)
тогда Спарк говорит мне:
error: inferred type arguments [Nothing,org.apache.spark.sql.Row] do
not conform to method run's type parameter bounds [Item,Basket <:
Iterable[Item]]
val model = fpg.run(rdd)
notebook:74: error: type mismatch;:
found : org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row]
required: org.apache.spark.api.java.JavaRDD[Basket]
Затем я попытался отобразить фрейм данных
val rdd = columnProducts.map{x:Row => x.getAs[List](0)}
Но это приводит к другой проблеме:
error: kinds of the type arguments (List) do not conform to the
expected kinds of the type parameters (type T).
List's type parameters do not match type T's expected parameters:
type List has one type parameter, but type T has none
val rdd = columnProducts.map{x:Row => x.getAs[List](0)}
Как определить тип параметра (T) для команды getAs List?
Или у кого-то есть другая хорошая идея, как на самом деле решить проблему, требующую СДР-корзин, но имеющую СДР-рядок?
Спасибо ребята