Спарк МинХашЛШ никогда не прогрессирует
Я новичок в спарке, но я пытаюсь создать сетевые кластеры, используя предоставленные пользователем теги или атрибуты. Сначала я использую алгоритм jaccard minhash для получения оценок сходства, а затем запускаю его с помощью алгоритма кластеризации итераций мощности, но как только он запускается, процессор не загружается и будет работать некоторое время с нулевым прогрессом. Хотите знать, как настроить кластер или изменить код, чтобы запустить его. Ниже мой код
//about 10,000 rows of (id, 100 tags in binary form)
val data = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("inferSchema","true").load("gs://data/*.csv")
val columnNames = data.columns
val tags = columnNames.slice(1, columnNames.size)
//put tags in a vector
val assembler = new VectorAssembler().setInputCols(tags).setOutputCol("attributes")
val newData = assembler.transform(data).select("userID","attributes")
val mh = new MinHashLSH().setNumHashTables(5).setInputCol("attributes").setOutputCol("values")
val modelMINHASH = mh.fit(goodData)
// Approximate nearest neighbor search
val fullData = modelMINHASH.approxSimilarityJoin(newData , newData , 0.9).filter("datasetA.userID < datasetB.userID")
var explodeDF = fullData.withColumn("id", fullData("datasetA.userID")).withColumn("id2", fullData("datasetB.userID")).select("id","id2","distCol")
val temp = explodeDF.rdd
val newRDD = temp.map(x => (x.getAs[Integer]("id").longValue(),x.getAs[Integer]("id2").longValue(),1-x.getAs[Double]("distCol"))).cache()
//this is where the code haults and I see no progress
val modelPIC = new PowerIterationClustering().setK(16).setMaxIterations(5).run(newRDD)
val clusters = modelPIC.assignments