Ошибка при использовании SparkReceiever и StreamingListener для противодавления
Я создал SparkEventReciever и StreamingListener для моего приложения. Мы читаем данные из rabbitMq и сохраняем их в Spark Store. Эта задача работала нормально, пока я не добавил туда логику противодавления (задержки планирования).
Итак, я использую задержку планирования, используя временную таблицу для хранения текущего значения задержки планирования и сравнивая ее при каждом запуске пакета со значением задержки планирования.
*/
@Override
public void onBatchStarted(StreamingListenerBatchStarted arg0) {
try {
int schedulingDelay = getSchedulingDelay(arg0);
if (schedulingDelay > scheduleDelayThresholdInMilliSec) {
handleSchedulingDelay(schedulingDelay);
} else {
handleNoSchedulingDelay();
}
} catch (Exception ex) {
logger.error(CCFLogCtx.COMPUTEENGINE, "Error occured in onBatchStarted method", ex);
}
}
protected int getSchedulingDelay(StreamingListenerBatchStarted arg0) {
return Integer.parseInt(arg0.batchInfo().schedulingDelay().get().toString());
}
/**
* This method handles no scheduling delay
*/
protected void handleNoSchedulingDelay() {
if (SchedulingDelayState.INIT.equals(schedulingDelayState)
|| SchedulingDelayState.DELAYED.equals(schedulingDelayState)) {
ReceiverInfo scheDelayInfo = prepareReceiverInfoForSchedulingDelay(SCHEDULING_DELAY_DEFAULT);
boolean receiverInfoAdded = queueReceiverInfoService.addReceiverInfo(scheDelayInfo);
if (receiverInfoAdded)
schedulingDelayState = SchedulingDelayState.NO_DELAY;
}
}
/**
* This method handles scheduling delay
* @param schedulingDelay
*/
protected void handleSchedulingDelay(int schedulingDelay) {
if (SchedulingDelayState.INIT.equals(schedulingDelayState)
|| SchedulingDelayState.NO_DELAY.equals(schedulingDelayState)) {
ReceiverInfo schedulingDelayInfo = prepareReceiverInfoForSchedulingDelay(schedulingDelay);
boolean receiverInfoAdded = queueReceiverInfoService.addReceiverInfo(schedulingDelayInfo);
if (receiverInfoAdded)
schedulingDelayState = SchedulingDelayState.DELAYED;
}
else {
logger.info(CCFLogCtx.COMPUTEENGINE, "Scheduling delay is already recorded!!");
}
}
protected ReceiverInfo prepareReceiverInfoForSchedulingDelay(int schedulingDelay) {
ReceiverInfo receiverInfo = new ReceiverInfo();
receiverInfo.setQueueType(QueueType.SPARK);
receiverInfo.setClusterName(queueProvider.getSparkQueueName());
receiverInfo.setAttributeName(ReceiverInfoAttribute.QUEUE_SCHEDULING_DELAY_IN_SECONDS);
receiverInfo.setAttributeValue(String.valueOf(schedulingDelay));
return receiverInfo;
}
Но при запуске этого я получаю ниже ошибок на spark.err.log
WARN 2018-03-29 11:21:02,161 org.apache.spark.network.server.TransportChannelHandler: Exception in connection from /10.3.58.135:41036
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0-zing_16.12.3.0]
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0-zing_16.12.3.0]
ERROR 2018-03-29 11:21:02,163 org.apache.spark.network.client.TransportResponseHandler: Still have 1 requests outstanding when connection from /10.3.58.135:41036 is closed
WARN 2018-03-29 11:21:02,168 org.apache.spark.storage.BlockManagerMaster: Failed to remove RDD 5 - Connection reset by peer
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0-zing_16.12.3.0]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0-zing_16.12.3.0]
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0-zing_16.12.3.0]
Так что я не могу решить этот вопрос сейчас.
Я использую DSE для Spark, Cassandra и rabbitMq, как я уже говорил ранее.
заранее спасибо