Сохранение кадра данных с использованием пакета spark-csv вызывает исключения и сбои (pyspark)
Я запускаю скрипт на spark 1.5.2 в автономном режиме (с использованием 8 ядер), и в конце сценария я пытаюсь сериализовать очень большой массив данных на диск, используя spark-csv
пакет. Фрагмент кода, который выдает исключение:
numfileparts = 16
data = data.repartition(numfileparts)
# Save the files as a bunch of csv files
datadir = "~/tempdatadir.csv/"
try:
(data
.write
.format('com.databricks.spark.csv')
.save(datadir,
mode="overwrite",
codec="org.apache.hadoop.io.compress.GzipCodec"))
except:
sys.exit("Could not save files.")
где data
это искровой датафрейм Во время выполнения я получаю следующую трассировку:
16/04/19 20:16:24 WARN QueuedThreadPool: 8 threads could not be stopped
16/04/19 20:16:24 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@70617ec1 rejected from java.util.concurrent.ThreadPoolExecutor@1bf5370e[Shutting d\
own, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 2859]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:347)
at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:330)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:65)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Это приводит к куче из них:
16/04/19 20:16:24 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-84d7d0a6-a3e5-4f48-bde0-0f6610e44e16/38/temp_shuffle_b9886819-be46-4e\
28-b57f-e592ea37ab95
java.io.FileNotFoundException: /tmp/blockmgr-84d7d0a6-a3e5-4f48-bde0-0f6610e44e16/38/temp_shuffle_b9886819-be46-4e28-b57f-e592ea37ab95 (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:160)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:174)
at org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/04/19 20:16:24 ERROR BypassMergeSortShuffleWriter: Error while deleting file for block temp_shuffle_b9886819-be46-4e28-b57f-e592ea37ab95
16/04/19 20:16:24 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-84d7d0a6-a3e5-4f48-bde0-0f6610e44e16/29/temp_shuffle_e474bcb1-5ead-4d\
7c-a58f-5398f32892f2
java.io.FileNotFoundException: /tmp/blockmgr-84d7d0a6-a3e5-4f48-bde0-0f6610e44e16/29/temp_shuffle_e474bcb1-5ead-4d7c-a58f-5398f32892f2 (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
... и так далее (я намеренно пропустил некоторые из последних строк.)
Я понимаю (примерно), что происходит, но я очень не уверен, что с этим делать - это проблема памяти? Я ищу совет о том, что делать - есть ли какие-то настройки, которые я могу изменить, добавить и т. Д.?