Искра `LiveListenerBus` Исключение меня бесит

Я использую кластеры AWS EMR, и версия спарк

spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.1
      /_/

Branch HEAD
Compiled by user ec2-user on 2016-10-11T00:04:18Z
Revision 8182b3893b6ce47724d35a32fbea3605a0d9cb98
Url git@aws157git.com:/pkg/Aws157BigTop

Я бегу немного Спарк GraphX задания в кластере и данные в формате json (gzip, хранятся в aws s3). Я обнаружил, что если данные небольшие (например, около 1000 строк), отправленные задания будут работать нормально без каких-либо ошибок или исключений, но если я увеличу размер данных (например, около 1 м строк), то LiveListenerBus исключение наступает, как диарея, которая меня очень пугает:-(

Я пытался настроить параметры лайков --num-executors,--executor-memory, не помогает вообще.

Вакансии представлены так,

spark-submit --class "MyJob" --master yarn mypkg.jar // if do --master local, no exception

Исключение, как это,

17/06/11 11:18:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
        at scala.collection.AbstractTraversable.to(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
        at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
        at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
        at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137)
        at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157)
        at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
        at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
        at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
        at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

Хорошо, я искал SO, и люди говорят, что это известная проблема, которая может быть исправлена ​​путем обновления Spark до более новой версии. ОДНАКО, похоже, что я - единственная команда, которая столкнулась с этой проблемой (возможно, потому что другие парни не делают GraphX рабочие места?)

Мой вопрос,

1) Может ли это исключение относиться к GraphX пакет искры?

2) Как это исправить? Должен ли я обновить Spark? А как обновить на AWS?

1 ответ

java.util.ConcurrentModificationException произошло, потому что, когда делает итератор iterator.hasNext() измененный объект итератора, такой как update,remove и son on.

Другие вопросы по тегам