Spark отбрасывает SparkListenerEvent, потому что в очереди событий нет места
Я использую pyspark для запуска своих искровых задач, и когда я пытался выполнить следующий скрипт на python, я получаю следующие ошибки.
from pyspark.sql.functions import count, sum, stddev_pop, mean, length
comp_df = sqlContext.sql('SELECT * FROM default.cdr_data')
df1 = comp_df.groupBy('number', 'type', 'week').agg(sum('callduration').alias('call_sum'), count('callduration').alias('call_count'), sum('iscompethot').alias('call_count_comp'))
df2 = df1.groupBy('number', 'type').agg((stddev_pop('call_sum') / mean('call_sum')).alias('coefficiant_of_variance'), sum('call_count').alias('call_count'), sum('call_count_competitor').alias('call_count_comp'))
Ошибка и предупреждение:
ERROR scheduler.LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
WARN scheduler.LiveListenerBus: Dropped 1 SparkListenerEvents since Thu Jan 01 05:30:00 IST 1970
И это сопровождалось повторяющимися ошибками,
ERROR cluster.YarnScheduler: Lost executor 149 on slv1.cdh-prod.com: Slave lost
WARN scheduler.TaskSetManager: Lost task 12.0 in stage 1.0 (TID 88088, slv1.cdh-prod.com, executor 149): ExecutorLostFailure (executor 149 exited caused by one of the running tasks) Reason: Slave lost
ERROR client.TransportClient: Failed to send RPC 5181111296686128218 to slv2.cdh-prod.com/192.168.x.xx:57156: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(83360,88076,Map(slv3.cdh-prod.com -> 88076, slv1.cdh-prod.com -> 88076, slv2.cdh-prod.com -> 88076),Set()) to AM was unsuccessful
java.io.IOException: Failed to send RPC 5181111296686128218 to slv2.cdh-prod.com/192.168.x.xx:57156: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
Я попробовал некоторые решения, увеличил стоимость spark.scheduler.listenerbus.eventqueue.size
но это не сработало. Я даже попытался с небольшим набором данных, но все еще получаю ошибку.