Искра: Исключение в теме "основной" akka.actor.ActorNotFound:

Я отправляю свои искровые задания с локального ноутбука на удаленный автономный кластер Spark (spark://IP:7077). Он успешно отправлен. Тем не менее, я не получаю никаких выходных данных и через некоторое время происходит сбой. Когда я проверяю работников в моем кластере, я нахожу следующее исключение:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]

Когда я запускаю тот же код в моей локальной системе (local[*]), он запускается успешно и выдает результат.

Обратите внимание, что я запускаю его в блокноте spark. Это же приложение успешно работает на удаленном автономном кластере, когда я отправляю его через терминал, используя spark-submit

Я что-то упустил в конфигурации ноутбука? Любые другие возможные причины?

Код очень прост.

Подробное исключение:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
    at akka.actor.ActorRef.tell(ActorRef.scala:125)
    at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44)
    at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
    at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
    at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
    at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
    at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Образец кода

val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf() 
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")

val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

2 ответа

Решение

Обновить:

Вышеупомянутой проблемы можно избежать, включив IP-адрес драйвера (т. Е. Общедоступный IP-адрес локального ноутбука) в код приложения. Это можно сделать, добавив следующую строку в контекст искры:

.set("spark.driver.host",YourSystemIPAddress)

Однако может возникнуть проблема, если IP-адрес драйвера находится за NAT. В этом случае работники не смогут найти IP.

Когда вы говорите "Записная книжка", я предполагаю, что вы имеете в виду проект github https://github.com/andypetrella/spark-notebook?

Мне бы пришлось изучить особенности ноутбука, но я заметил, что ваш работник пытается подключиться к мастеру на "localhost".

Для нормальной конфигурации Spark на рабочем месте установите SPARK_MASTER_IP в $SPARK_HOME/conf/spark-env.sh и посмотрите, поможет ли это, даже если вы работаете на одной машине в автономном режиме, установите это. По моему опыту, Spark не всегда разрешает имена хостов должным образом, поэтому неплохо было бы начинать с базового уровня всех IP-адресов.

Остальное - общая информация, посмотрите, поможет ли это с вашей конкретной проблемой:

Если вы отправляете в кластер со своего ноутбука, вы используете --deploy-mode для кластеризации, чтобы сообщить своему драйверу о работе на одном из рабочих узлов. Это создает дополнительные соображения относительно того, как вы настраиваете свой путь к классам, потому что вы не знаете, на каком работнике будет работать драйвер.

Вот некоторая общая информация в интересах полноты, есть известная ошибка Spark о преобразовании имен хостов в IP-адреса. Я не представляю это как полный ответ во всех случаях, но я предлагаю попробовать с базовым уровнем просто использовать все IP-адреса и использовать только одну конфигурацию SPARK_MASTER_IP. Только с этими двумя практиками я заставляю мои кластеры работать, и все другие конфигурации, или с использованием имен хостов, похоже, все портят.

Так что в вашем spark-env.sh избавьтесь от SPARK_LOCAL_IP и измените SPARK_MASTER_IP на IP-адрес, а не имя хоста.

Я рассмотрел это более подробно в этом ответе.

Для большей полноты вот часть этого ответа:

Можете ли вы пропинговать окно, где работает мастер Spark? Можете ли вы пинговать работника от мастера? Что еще более важно, вы можете использовать ssh без пароля для рабочего из мастер-бокса? В соответствии с 1.5.2 документа вы должны быть в состоянии сделать это с закрытым ключом и иметь рабочий введен в файл conf / slaves. Я скопировал соответствующий пункт в конце.

Вы можете получить ситуацию, когда рабочий может связаться с мастером, но мастер не может вернуться к рабочему, поэтому похоже, что соединение не установлено. Проверьте оба направления. Я думаю, что ведомый файл на главном узле, и ssh без пароля может привести к подобным ошибкам к тому, что вы видите.

Согласно ответу, который я сшил, есть также старая ошибка, но не ясно, как эта ошибка была решена.

Другие вопросы по тегам