Spark: пример PageRank, когда итерация слишком велика, генерирует stackruError
Я тестирую пример PageRank по умолчанию для spark и устанавливаю итерацию равной 1024, затем она вызывает stackruerror. Я также встретил эту проблему в моей другой программе. Как я могу решить ее.
object SparkPageRank {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
System.exit(1)
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{ s => val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
System.exit(0)
}
}
Я выкладываю ошибку здесь.
[spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system
java.lang.StackruError
at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326)
2 ответа
Это потому, что эти преобразования в цикле for создают очень длинные зависимости в вашем rdd. Когда вы попытаетесь запустить искровое задание, рекурсивное посещение вашего rdd приведет к ошибке stackru.
Чтобы решить эту проблему, вы можете использовать checkpoint()
на вашем RDD. cache()
не поможет вам оценить ваш RDD немедленно.
Так что вы должны позвонить cache()
а также checkpoint()
на вашем промежуточном rdd после определенных итераций и вручную оцените его, чтобы очистить его зависимости.
Я предполагаю, что ошибка происходит, потому что промежуточные СДР не оцениваются до collect()
, И при сборе они оцениваются рекурсивно.
Попробуй добавить cache()
для оценки СДР на каждой итерации, вероятно, это поможет:
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _).cache