Spark Listener EventLoggingListener выдал исключение / ConcurrentModificationException
В нашем приложении (Spark 2.0.1) это исключение часто появляется. Я ничего не могу найти по этому поводу. Что может быть причиной?
16/10/27 11:18:24 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
at scala.collection.AbstractTraversable.to(Traversable.scala:104)
at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
at scala.Option.map(Option.scala:146)
at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
РЕДАКТИРОВАТЬ: еще одна информация, наше приложение является долгосрочным, и чтобы возобновить работу из потенциально неудачного контекста искры, мы используем метод SparkBuilder.getOrCreate() между двумя "заданиями". Может ли это напутать со слушателями?
3 ответа
Это известная проблема в Spark 2.0.1
( SPARK-17816) и будет исправлено Spark 2.0.2/2.1.0
( связанный запрос на извлечение).
Чтобы избавиться от исключения, не дожидаясь Spark 2.0.2/2.1.0
, клонируйте последнюю, нестабильную версию spark и соберите apache-spark вручную.
Обновление: они выпустили Spark 2.0.2
!
Мы также только что обновились до Spark 2.0.1 и начали видеть то же самое исключение. Мы сузили причину до раздела кода Python, содержащего следующую идиому:
a = spark_context.textFile('..')
a = a.map(stuff)
b = a.filter(stuff).map(stuff)
В прошлом у меня были проблемы с самостоятельным назначением переменных в Spark, но после обновления до 2.0.1 проблема стала действительно острой, и мы начали видеть исключения ConcurrentModification.
Для нас исправлением было просто изменить код, чтобы он не выполнял самостоятельных заданий.
Аналогичная проблема возникла в Spark 3.1.0, связанная сEventLoggingListener
состояние гонки и описано в следующих отчетах об ошибках:
https://issues.apache.org/jira/browse/SPARK-34731
https://issues.apache.org/jira/browse/SPARK-32027
Проблема была исправлена в Spark 3.1.2, поэтому обновление Spark с 3.1.0/3.1.1 до 3.1.2 решит ее. В качестве альтернативы можно избежать ошибки, полностью отключив ведение журнала событий:
spark.eventLog.enabled=false