Получение NullPointerException с API Graphx / Pregel на Spark в кластере EMR
Я пытаюсь сгенерировать информацию об иерархии на основе этого примера: https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/
После настройки кода по мере необходимости, когда я запускаю код, он отлично работает на моем локальном компьютере, но я получаю NPE в EMR.
val hrchyRDD = initialGraph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(setMsg, sendMsg, mergeMsg)
Я также добавил дополнительную память, сократил количество исполнителей и тому подобное, но безрезультатно.
scala> sc.getConf.getAll;
res3: Array[(String, String)] = Array((spark.eventLog.enabled,true),
(spark.app.id,application_1527179090729_0015), (spark.driver.extraLibraryPath,/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native), (spark.default.parallelism,2560), (spark.blacklist.decommissioning.timeout,1h), (spark.yarn.secondary.jars,bcpg-jdk15on-158.jar,bcprov-jdk15on-158.jar,aws-encryption-sdk-java-1.3.1.jar), (spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS,$(hostname -f)), (spark.driver.memory,5g), (spark.driver.port,42783), (spark.executor.cores,4), (spark.yarn.historyServer.address,ip-10-0-35-88.ec2.internal:18080), (spark.repl.class.uri,spark://10.0.35.88:42783/classes), (spark.executor.instances,6), (spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://ip-10-0-35-88....
sc.getConf.get("spark.executor.memory")
res4: String = 7g
sc.getConf.get("spark.executor.instances")
res5: String = 6
Трассировки стека:
org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 145 на этапе 3.0 не выполнено 4 раза, последний сбой: потерянное задание 145.3 на этапе 3.0 (TID 1340, ip-10-0-36-20.ec2. внутренняя, исполнитель 33): java.lang.NullPointerException в $anonfun$5.apply(:61) в $anonfun$5.apply(:61) в org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61) в org.apache.spark.graphx.impl.GraphImpl$anonfun$mapVertices$1.apply(GraphImpl.scala:136) в org.apache.spark.graphx.impl.GraphImpl $ anonfun $ mapVertices $ 1.apply (GraphImpl. scala: 136) в scala.collection.Iterator$anon$11.next(Iterator.scala:409) в org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues (MemoryStore.scala:216) в org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:1039) в org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:1030) в org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:970) в org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1030) в org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:761) в org.apache.spark.rdd.RDD.getOrCompute(RDD.scala: RDD.scala: в org.apache.spark.rdd.RDD.iterator(RDD.scala:285) в org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) в org.apache.spark.rdd.RDD.computeOrReadCheck (RDD.scala:323) в org.apache.spark.rdd.RDD.iterator(RDD.scala:287) в org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) в org.apache.spark.rdd.RDD.iterator(RDD.scala:287) в org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap 96) в org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) в org.apache.spark.scheduler.Task.run(Task.scala:108) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) по адресу java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. Java:1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748) Отслеживание стека драйверов: в org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ планировщик $DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1690) в org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scark 1665):.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1677) в scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) в scala.collection.mutable.ArrayBuffer.foreach(ArrayBu.scala:48) в org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677) в org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) apache.spark.scheduler.DAGScheduler $ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 855) в scala.Option.foreach(Option.scala:257) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855) по адресу org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905) по адресу org.apache.spark.scheduler.DAGSchedulerAgladuRelay.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1849) в org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) в org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) в org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) в org.apache.spark.SparkContext.runJob(SparkContext.scala:2119) в org.apache.sparkr..RDD$anonfun$ уменьшить $1.apply(RDD.scala:1026) в org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) в org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) в org.apache.spark.rdd.RDD.withScope(RDD.scala:362) в org.apache.spark.rdd.RDD.reduce(RDD.scala:1008) в org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90) в org.apache.spark.graphx.Pregel$.apply(Pregel.scala:140) в org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:370) в calcTopLevelHierarchy(:62) ... 54 исключено. Вызвано: java.lang.NullPointerException в $anonfun$5.apply(:61) в $anonfun$5.apply(:61) в org.apache.spark.graphx.impl.VertexPartitionBaseOps.map (VertexPartitionBaseOps. scala: 61) в org.apache.spark.graphx.impl.GraphImpl$anonfun$mapVertices$1.apply(GraphImpl.scala:136) в org.apache.spark.graphx.impl.GraphImpl$anonfun$mapVertices$1.apply(GraphImpl.scala:136) в scala.collection.Iterator$anon$11.next(Iterator.scala:409) в org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues (MemoryStore.scala:216) в org.apache. spark.storage.BlockManager $ anonfun $ doPutIterator $ 1.apply (BlockManager.scala: 1039) в org.apache.spark.storage.BlockManager$anonfun$doPutIterator$1.apply(BlockManager.scala:1030) в org.apache.spark. storage.BlockManager.doPut (BlockManager.scala: 970) в org.apache.spark.storage.BlockManager.d oPutIterator (BlockManager.scala: 1030) в org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:761) в org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) в org.ap.spark.rdd.RDD.iterator (RDD.scala: 285) в org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.sc.ala: 323) в org.apache.spark.rdd.RDD.iterator(RDD.scala:287) в org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) в org.apache.spark.rdd. RDD.computeOrReadCheckpoint (RDD.scala:323) в org.apache.spark.rdd.RDD.iterator(RDD.scala:287) в org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.asg) 96).apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run(Task.scala:108) в org.apache.spark.executor.Executor $ TaskRunner.un (Executor.scala: 335) на java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) на java.u til.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748)