Спарк МинХашЛШ никогда не прогрессирует

Я новичок в спарке, но я пытаюсь создать сетевые кластеры, используя предоставленные пользователем теги или атрибуты. Сначала я использую алгоритм jaccard minhash для получения оценок сходства, а затем запускаю его с помощью алгоритма кластеризации итераций мощности, но как только он запускается, процессор не загружается и будет работать некоторое время с нулевым прогрессом. Хотите знать, как настроить кластер или изменить код, чтобы запустить его. Ниже мой код

//about 10,000 rows of (id, 100 tags in binary form)

val data = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("inferSchema","true").load("gs://data/*.csv")

val columnNames = data.columns

val tags = columnNames.slice(1, columnNames.size)

//put tags in a vector
val assembler = new VectorAssembler().setInputCols(tags).setOutputCol("attributes")

val newData = assembler.transform(data).select("userID","attributes")

val mh = new MinHashLSH().setNumHashTables(5).setInputCol("attributes").setOutputCol("values")

val modelMINHASH = mh.fit(goodData)

// Approximate nearest neighbor search
val fullData = modelMINHASH.approxSimilarityJoin(newData , newData , 0.9).filter("datasetA.userID < datasetB.userID")

var explodeDF = fullData.withColumn("id", fullData("datasetA.userID")).withColumn("id2", fullData("datasetB.userID")).select("id","id2","distCol")

val temp = explodeDF.rdd

val newRDD = temp.map(x => (x.getAs[Integer]("id").longValue(),x.getAs[Integer]("id2").longValue(),1-x.getAs[Double]("distCol"))).cache()

//this is where the code haults and I see no progress
val modelPIC = new PowerIterationClustering().setK(16).setMaxIterations(5).run(newRDD)

val clusters = modelPIC.assignments

0 ответов

Другие вопросы по тегам