Spark Streaming - остановленный рабочий выдает исключение FileNotFoundException
Я запускаю потоковое приложение в кластере, состоящем из трех узлов, каждый с работником и тремя исполнителями (всего 9 исполнителей). Я использую спарк автономный режим (версия 2.1.1).
Приложение запускается командой spark-submit с опцией --deploy-mode client
а также --conf spark.streaming.stopGracefullyOnShutdown=true
, Команда submit запускается с одного из узлов, назовем его узлом 1.
В качестве теста на отказоустойчивость я остановил работника на узле 2, вызвав скрипт stop-slave.sh
,
В журналах исполнителя на узле 2 я вижу несколько ошибок, связанных с FileNotFoundException во время операции случайного воспроизведения:
ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Я вижу 4 ошибки такого рода в одной задаче в каждом из 3 исполнителей на узле 2.
В журналах драйверов я вижу:
ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 5531241.0 failed 4 times, most recent failure: Lost task 5.3 in stage 5531241.0 (TID 62488335, 10.7.94.68, executor 2): java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.9e6148da-6ce2-4de5-94ab-d95db2c8f9f7 (No such file or directory)
Это снимает заявку, как и ожидалось: исполнитель достиг spark.task.maxFailures
на одной задаче, а затем приложение останавливается.
Я запустил разные тесты, и все они, кроме одного, закончились остановкой приложения. Моя идея состоит в том, что поведение может варьироваться в зависимости от точного шага в процессе потока, который я прошу работника остановить. В любом случае, все остальные тесты завершились с той же ошибкой, описанной выше.
Увеличение параметра spark.task.maxFailures
до 8 тоже не помогло, с задачей сигнализации TaskSetManager не удалось 8 раз вместо 4.
Что делать, если работник убит?
Я также провел другой тест: я убил рабочий и 3 исполнительных процесса на узле 2 с помощью команды kill -9
, И в этом случае потоковое приложение адаптировалось к оставшимся ресурсам и продолжало работать.
В журнале драйверов мы видим, что водитель замечает пропавших исполнителей:
ERROR TaskSchedulerImpl: Lost executor 0 on 10.7.94.68: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Затем мы замечаем длинную серию следующих ошибок:
17/08/29 14:43:19 ERROR ReceiverTracker: Deregistered receiver for stream 5: Error starting receiver 5 - org.jboss.netty.channel.ChannelException: Failed to bind to: /X.X.X.X:40001
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:162)
at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:169)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.BindException: Cannot assign requested address
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:414)
at sun.nio.ch.Net.bind(Net.java:406)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
... 3 more
Эта ошибка появляется в журнале, пока убитый работник не запустится снова.
Заключение
Остановка рабочего с помощью выделенной команды имеет неожиданное поведение: приложение должно быть в состоянии справиться с пропущенной обработкой, адаптируясь к оставшимся ресурсам и продолжать работать (как это происходит в случае kill
).
Каковы ваши наблюдения по этому вопросу?
Спасибо Давиде