Spark Streaming - остановленный рабочий выдает исключение FileNotFoundException

Я запускаю потоковое приложение в кластере, состоящем из трех узлов, каждый с работником и тремя исполнителями (всего 9 исполнителей). Я использую спарк автономный режим (версия 2.1.1).

Приложение запускается командой spark-submit с опцией --deploy-mode client а также --conf spark.streaming.stopGracefullyOnShutdown=true, Команда submit запускается с одного из узлов, назовем его узлом 1.

В качестве теста на отказоустойчивость я остановил работника на узле 2, вызвав скрипт stop-slave.sh,

В журналах исполнителя на узле 2 я вижу несколько ошибок, связанных с FileNotFoundException во время операции случайного воспроизведения:

ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Я вижу 4 ошибки такого рода в одной задаче в каждом из 3 исполнителей на узле 2.

В журналах драйверов я вижу:

ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
 ...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 5531241.0 failed 4 times, most recent failure: Lost task 5.3 in stage 5531241.0 (TID 62488335, 10.7.94.68, executor 2): java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.9e6148da-6ce2-4de5-94ab-d95db2c8f9f7 (No such file or directory)

Это снимает заявку, как и ожидалось: исполнитель достиг spark.task.maxFailures на одной задаче, а затем приложение останавливается.

Я запустил разные тесты, и все они, кроме одного, закончились остановкой приложения. Моя идея состоит в том, что поведение может варьироваться в зависимости от точного шага в процессе потока, который я прошу работника остановить. В любом случае, все остальные тесты завершились с той же ошибкой, описанной выше.

Увеличение параметра spark.task.maxFailures до 8 тоже не помогло, с задачей сигнализации TaskSetManager не удалось 8 раз вместо 4.

Что делать, если работник убит?

Я также провел другой тест: я убил рабочий и 3 исполнительных процесса на узле 2 с помощью команды kill -9, И в этом случае потоковое приложение адаптировалось к оставшимся ресурсам и продолжало работать.

В журнале драйверов мы видим, что водитель замечает пропавших исполнителей:

ERROR TaskSchedulerImpl: Lost executor 0 on 10.7.94.68: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Затем мы замечаем длинную серию следующих ошибок:

17/08/29 14:43:19 ERROR ReceiverTracker: Deregistered receiver for stream 5: Error starting receiver 5 - org.jboss.netty.channel.ChannelException: Failed to bind to: /X.X.X.X:40001
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
    at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:162)
    at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:169)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:414)
    at sun.nio.ch.Net.bind(Net.java:406)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    ... 3 more

Эта ошибка появляется в журнале, пока убитый работник не запустится снова.

Заключение

Остановка рабочего с помощью выделенной команды имеет неожиданное поведение: приложение должно быть в состоянии справиться с пропущенной обработкой, адаптируясь к оставшимся ресурсам и продолжать работать (как это происходит в случае kill).

Каковы ваши наблюдения по этому вопросу?

Спасибо Давиде

0 ответов

Другие вопросы по тегам