NullPointerException в Scala Spark, по-видимому, вызвано типом коллекции?
sessionIdList имеет тип:
scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] при отличном в:30
Когда я пытаюсь запустить ниже код:
val x = sc.parallelize(List(1,2,3))
val cartesianComp = x.cartesian(x).map(x => (x))
val kDistanceNeighbourhood = sessionIdList.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Я получаю исключение:
14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Однако, если я использую:
val l = sc.parallelize(List("1","2"))
val kDistanceNeighbourhood = l.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Тогда исключение не отображается
Разница между двумя фрагментами кода заключается в том, что в первом фрагменте sessionIdList имеет тип:
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
и во втором фрагменте "л" имеет тип
scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12
Почему происходит эта ошибка?
Нужно ли конвертировать sessionIdList в ParallelCollectionRDD, чтобы это исправить?
2 ответа
Spark не поддерживает вложение RDD (см. /questions/10794666/vyizov-otlichnogo-i-karta-vmeste-brosaet-npe-v-iskrovoj-biblioteke/10794673#10794673 для другого появления той же проблемы), поэтому вы не можете выполнять преобразования или действия с RDD внутри других операций RDD.
В первом случае вы видите исключение NullPointerException, выдаваемое работником, когда он пытается получить доступ к объекту SparkContext, который присутствует только в драйвере, а не в работниках.
Во втором случае моя догадка заключается в том, что задание выполнялось локально на драйвере и работало чисто случайно.
Это разумный вопрос, и я слышал, он задавал это достаточно много раз. Я попытаюсь объяснить, почему это так, потому что это может помочь.
Вложенные RDD всегда выдают исключение при производстве. Вызовы вложенных функций, как я думаю, вы описываете их здесь, если это означает, что вызов операции RDD внутри операции RDD также вызовет сбои, поскольку это фактически одно и то же. (СДР являются неизменяемыми, поэтому выполнение операции СДР, такой как "карта", эквивалентно созданию нового СДР.) Возможность создавать вложенные СДР является необходимым следствием способа определения СДР и способа применения Spark Application. настроить.
RDD - это распределенная коллекция объектов (называемых разделами), которые живут в Spark Executors. Исполнители Spark не могут общаться друг с другом, только с драйвером Spark. Все операции RDD вычисляются по частям в этих разделах. Так как среда исполнителя RDD не является рекурсивной (то есть вы можете настроить драйвер Spark для работы на искровом исполнителе с подчиненными исполнителями), также как и RDD.
В вашей программе вы создали распределенную коллекцию разделов целых чисел. Затем вы выполняете операцию сопоставления. Когда драйвер Spark видит операцию сопоставления, он отправляет инструкции для сопоставления исполнителям, которые выполняют преобразование для каждого раздела параллельно. Но ваше сопоставление не может быть выполнено, потому что на каждом разделе вы пытаетесь вызвать "весь RDD" для выполнения другой распределенной операции. Это не может быть сделано, потому что у каждого раздела нет доступа к информации о других разделах, если это не так, вычисления не могут выполняться параллельно.
Вместо этого вы можете сделать, потому что данные, которые вам нужны на карте, вероятно, невелики (поскольку вы делаете фильтр, а фильтр не требует никакой информации о sessionIdList), это сначала отфильтровать список идентификаторов сеансов. Затем соберите этот список водителю. Затем передайте его исполнителям, где вы сможете использовать его на карте. Если список sessionID слишком велик, вам, вероятно, потребуется выполнить объединение.