Ошибка при использовании SparkReceiever и StreamingListener для противодавления

Я создал SparkEventReciever и StreamingListener для моего приложения. Мы читаем данные из rabbitMq и сохраняем их в Spark Store. Эта задача работала нормально, пока я не добавил туда логику противодавления (задержки планирования).

Итак, я использую задержку планирования, используя временную таблицу для хранения текущего значения задержки планирования и сравнивая ее при каждом запуске пакета со значением задержки планирования.

 */
@Override
public void onBatchStarted(StreamingListenerBatchStarted arg0) {
    try {

        int schedulingDelay = getSchedulingDelay(arg0);

        if (schedulingDelay > scheduleDelayThresholdInMilliSec) {
            handleSchedulingDelay(schedulingDelay);
        } else {
            handleNoSchedulingDelay();
        }
    } catch (Exception ex) {
        logger.error(CCFLogCtx.COMPUTEENGINE, "Error occured in onBatchStarted method", ex);
    }
}




protected int getSchedulingDelay(StreamingListenerBatchStarted arg0) {
    return Integer.parseInt(arg0.batchInfo().schedulingDelay().get().toString());
}




/**
 * This method handles no scheduling delay
 */
protected void handleNoSchedulingDelay() {
    if (SchedulingDelayState.INIT.equals(schedulingDelayState)
            || SchedulingDelayState.DELAYED.equals(schedulingDelayState)) {

        ReceiverInfo scheDelayInfo = prepareReceiverInfoForSchedulingDelay(SCHEDULING_DELAY_DEFAULT);
        boolean receiverInfoAdded = queueReceiverInfoService.addReceiverInfo(scheDelayInfo);
        if (receiverInfoAdded)
            schedulingDelayState = SchedulingDelayState.NO_DELAY;
    }
}

/**
 * This method handles scheduling delay
 * @param schedulingDelay
 */
protected void handleSchedulingDelay(int schedulingDelay) {
    if (SchedulingDelayState.INIT.equals(schedulingDelayState)
            || SchedulingDelayState.NO_DELAY.equals(schedulingDelayState)) {

        ReceiverInfo schedulingDelayInfo = prepareReceiverInfoForSchedulingDelay(schedulingDelay);

        boolean receiverInfoAdded = queueReceiverInfoService.addReceiverInfo(schedulingDelayInfo);

        if (receiverInfoAdded)
            schedulingDelayState = SchedulingDelayState.DELAYED;
    } 
    else {
        logger.info(CCFLogCtx.COMPUTEENGINE, "Scheduling delay is already recorded!!");
    }
}


protected ReceiverInfo prepareReceiverInfoForSchedulingDelay(int schedulingDelay) {
    ReceiverInfo receiverInfo = new ReceiverInfo();
    receiverInfo.setQueueType(QueueType.SPARK);
    receiverInfo.setClusterName(queueProvider.getSparkQueueName());
    receiverInfo.setAttributeName(ReceiverInfoAttribute.QUEUE_SCHEDULING_DELAY_IN_SECONDS);
    receiverInfo.setAttributeValue(String.valueOf(schedulingDelay));
    return receiverInfo;
}

Но при запуске этого я получаю ниже ошибок на spark.err.log

WARN  2018-03-29 11:21:02,161 org.apache.spark.network.server.TransportChannelHandler: Exception in connection from /10.3.58.135:41036
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0-zing_16.12.3.0]
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) [netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) [netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) [netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) [netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) [netty-all-4.0.42.Final.jar:4.0.42.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0-zing_16.12.3.0]
ERROR 2018-03-29 11:21:02,163 org.apache.spark.network.client.TransportResponseHandler: Still have 1 requests outstanding when connection from /10.3.58.135:41036 is closed
WARN  2018-03-29 11:21:02,168 org.apache.spark.storage.BlockManagerMaster: Failed to remove RDD 5 - Connection reset by peer
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0-zing_16.12.3.0]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0-zing_16.12.3.0]
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0-zing_16.12.3.0]

Так что я не могу решить этот вопрос сейчас.

Я использую DSE для Spark, Cassandra и rabbitMq, как я уже говорил ранее.

заранее спасибо

0 ответов

Другие вопросы по тегам