Искровой потоковый приемник Out of Memory (OOM)

У меня проблема, когда ресивер перезагружается снова и снова.
Я использую Spark 1.6.1. Я использую Spark Streaming для получения потоковой передачи, затем использую map для десериализации данных pb.

Мое тестирование содержит два случая:

  1. Просто получите данные и распечатайте напрямую: приложение стабильно
  2. Получите и десериализуйте: это создает проблемы. Время появления не является регулярным. Есть 500Mb/ мин. Я установил память исполнителя на 8 ГБ. Проблема в том, как что-то чрезвычайно выделяет память. Но я не знаю почему.

Мой код:

val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
    i => ssc.networkStream(
    new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
)
}
val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = {

if (null == value || value.isEmpty) {
    return ""
}
var cuid = ""
var os = ""
var channel = ""
var sv = ""
var resid = ""
var appid = ""
var prod = ""
try { //if exception,useless data,just drop it
    val timeStrIndex = value.indexOf(",\"time_str\"")
    var strAfterTruncation = ""
    if (-1 != timeStrIndex) {
        strAfterTruncation = value.substring(0,timeStrIndex) + "}"
    } else {
        strAfterTruncation = value
    }
    val jsonData = JSONObject.fromObject(strAfterTruncation)
    //val jsonData = value.getAsJsonArray()
    val binBody = jsonData.getString("bin_body")
    val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
    Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
        a.getKey() match {
            case "cuid" => cuid += a.getValue()
            case "os" => os += a.getValue()
            case "channel" => channel += a.getValue()
            case "sv" => sv += a.getValue()
            case "resid" => resid += a.getValue()
            case "appid" => appid += a.getValue()
            case "prod" => prod += a.getValue()
            case _ => null
        }
    )
    val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
    os = os.toLowerCase()
    if (os.matches("android(.*)")) {
        os = "android"
    } else if (os.matches("iphone(.*)")) {
        os = "iphone"
    } else if (os.matches("ipad(.*)")) {
        os = "ipad"
    } else if (os.matches("s60(.*)")) {
        os = "symbian"
    } else if (os.matches("wp7(.*)")) {
        os = "wp7"
    } else if (os.matches("wp(.*)")) {
        os = "wp"
    } else if (os.matches("tizen(.*)")) {
        os = "tizen"

    val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
    val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
    if (ifHasLogid) {
        val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
        if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
            ""
        } else {
            decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
        }
    } else {
        ""
    }
} catch {
    case _:Throwable => ""
}
}
lines.map(parseData).print()

Текст ошибки:

016-07-12T12:00:01.546+0800: 5096.643: [GC (Allocation Failure) 
Desired survivor size 442499072 bytes, new threshold 1 (max 15)
[PSYoungGen: 0K->0K(2356736K)] 5059009K->5059009K(7949312K), 0.0103342 secs] [Times: user=0.21 sys=0.00, real=0.01 secs] 
2016-07-12T12:00:01.556+0800: 5096.654: [Full GC (Allocation Failure) [PSYoungGen: 0K->0K(2356736K)] [ParOldGen:    5059009K->5057376K(5592576K)] 5059009K->5057376K(7949312K), [Metaspace: 44836K->44490K(1089536K)], 0.8769617 secs] [Times: user=17.88   sys=0.04, real=0.88 secs] 
2016-07-12T12:00:02.434+0800: 5097.531: Total time for which application threads were stopped: 1.2951974 seconds, Stopping threads  took: 0.0000662 seconds
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid24310.hprof ...
2016-07-12T12:00:30.960+0800: 5126.057: Total time for which application threads were stopped: 28.5260812 seconds, Stopping threads     took: 0.0000995 seconds
Heap dump file created [5211252802 bytes in 28.526 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 24310"...
2016-07-12T12:00:31.589+0800: 5126.686: Total time for which application threads were stopped: 0.6289627 seconds, Stopping threads  took: 0.0001258 seconds
2016-07-12T12:00:31.595+0800: 5126.692: Total time for which application threads were stopped: 0.0004822 seconds, Stopping threads  took: 0.0001493 seconds
2016-07-12 12:00:31.597 [Thread-5] ERROR [Logging.scala:95] - Uncaught exception in thread Thread[Thread-5,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) ~[na:1.8.0_51]
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1202) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:858) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:157) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:128) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:296) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(    BlockGenerator.scala:268) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
2016-07-12 12:00:31.600 [SIGTERM handler] ERROR [SignalLogger.scala:57] - RECEIVED SIGNAL 15: SIGTERM
2016-07-12T12:00:31.611+0800: 5126.708: Total time for which application threads were stopped: 0.0005602 seconds, Stopping threads  took: 0.0001765 seconds
2016-07-12T12:00:31.617+0800: 5126.714: Total time for which application threads were stopped: 0.0004800 seconds, Stopping threads  took: 0.0001412 seconds
2016-07-12 12:00:32.483 [Bigpipe Receiver-SendThread(cq01-bigpipe-proxy01.cq01.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:32.507+0800: 5127.604: Total time for which application threads were stopped: 0.0004604 seconds, Stopping threads  took: 0.0001198 seconds
2016-07-12T12:00:32.509+0800: 5127.606: Total time for which application threads were stopped: 0.0002919 seconds, Stopping threads  took: 0.0001800 seconds
2016-07-12T12:00:32.509+0800: 5127.607: Total time for which application threads were stopped: 0.0002692 seconds, Stopping threads  took: 0.0001612 seconds
2016-07-12 12:00:32.549 [Bigpipe Receiver-SendThread(tc-bigpipe-proxy03.tc.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:34.220+0800: 5129.317: [GC (Allocation Failure) 
Desired survivor size 424148992 bytes, new threshold 2 (max 15)
[PSYoungGen: 1931776K->188775K(2363904K)] 6989152K->5246152K(7956480K), 0.2569385 secs] [Times: user=0.00 sys=5.19, real=0.26 secs] 
2016-07-12T12:00:34.477+0800: 5129.575: Total time for which application threads were stopped: 0.2575019 seconds, Stopping threads  took: 0.0000384 seconds
2016-07-12T12:00:35.478+0800: 5130.575: Total time for which application threads were stopped: 0.0002786 seconds, Stopping threads  took: 0.0000424 seconds
2016-07-12T12:00:37.600+0800: 5132.697: [GC (Allocation Failure) 
Desired survivor size 482344960 bytes, new threshold 3 (max 15)
[PSYoungGen: 2120551K->387013K(2268160K)] 7177928K->5444389K(7860736K), 0.5153031 secs] [Times: user=0.00 sys=9.89, real=0.52 secs] 
2016-07-12T12:00:38.116+0800: 5133.213: Total time for which application threads were stopped: 0.5157529 seconds, Stopping threads  took: 0.0000427 seconds
2016-07-12T12:00:40.116+0800: 5135.213: Total time for which application threads were stopped: 0.0003171 seconds, Stopping threads  took: 0.0001000 seconds
2016-07-12T12:00:40.419+0800: 5135.516: [GC (Allocation Failure) 
Desired survivor size 599785472 bytes, new threshold 2 (max 15)
[PSYoungGen: 2240965K->471033K(2324992K)] 7298341K->5633517K(7917568K), 0.3621433 secs] [Times: user=0.12 sys=7.11, real=0.36 secs] 
2016-07-12T12:00:40.781+0800: 5135.878: Total time for which application threads were stopped: 0.3626080 seconds, Stopping threads  took: 0.0000429 seconds
2016-07-12T12:00:41.781+0800: 5136.879: Total time for which application threads were stopped: 0.0003301 seconds, Stopping threads  took: 0.0000947 seconds
2016-07-12T12:00:43.108+0800: 5138.205: [GC (Allocation Failure) 
Desired survivor size 620756992 bytes, new threshold 3 (max 15)
[PSYoungGen: 2324985K->378481K(2054656K)] 7487469K->5831048K(7647232K), 0.2593685 secs] [Times: user=0.66 sys=4.96, real=0.26 secs] 
2016-07-12T12:00:43.368+0800: 5138.465: [Full GC (Ergonomics) [PSYoungGen: 378481K->0K(2054656K)] [ParOldGen:   5452566K->4713601K(5592576K)] 5831048K->4713601K(7647232K), [Metaspace: 44635K->44635K(1089536K)], 4.3137405 secs] [Times: user=9.78    sys=74.53, real=4.31 secs] 
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 4.5736603 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 0.0002430 seconds, Stopping threads  took: 0.0000856 seconds
2016-07-12T12:00:49.954+0800: 5145.052: [GC (Allocation Failure) 
Desired survivor size 597688320 bytes, new threshold 4 (max 15)
[PSYoungGen: 1583616K->161266K(2189824K)] 6297217K->4874867K(7782400K), 0.0388138 secs] [Times: user=0.00 sys=0.84, real=0.04 secs] 
2016-07-12T12:00:49.993+0800: 5145.091: Total time for which application threads were stopped: 0.0392926 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:51.903+0800: 5147.000: [GC (Allocation Failure) 
Desired survivor size 596115456 bytes, new threshold 5 (max 15)
[PSYoungGen: 1744882K->324587K(2213888K)] 6458483K->5038189K(7806464K), 0.0334029 secs] [Times: user=0.69 sys=0.03, real=0.04 secs] 
2016-07-12T12:00:51.936+0800: 5147.034: Total time for which application threads were stopped: 0.0338707 seconds, Stopping threads  took: 0.0000404 seconds
2016-07-12T12:00:53.942+0800: 5149.039: [GC (Allocation Failure) 
Desired survivor size 654835712 bytes, new threshold 6 (max 15)
[PSYoungGen: 1954795K->490438K(2120704K)] 6668397K->5204039K(7713280K), 0.0441762 secs] [Times: user=0.95 sys=0.02, real=0.05 secs] 
2016-07-12T12:00:53.986+0800: 5149.083: Total time for which application threads were stopped: 0.0446174 seconds, Stopping threads  took: 0.0000456 seconds
2016-07-12T12:00:56.102+0800: 5151.199: [GC (Allocation Failure) 
Desired survivor size 763887616 bytes, new threshold 5 (max 15)
[PSYoungGen: 2120646K->639467K(1943552K)] 6834247K->5370280K(7536128K), 0.1124828 secs] [Times: user=1.07 sys=1.30, real=0.11 secs] 
2016-07-12T12:00:56.214+0800: 5151.312: Total time for which application threads were stopped: 0.1129348 seconds, Stopping threads  took: 0.0000396 seconds
2016-07-12T12:00:57.784+0800: 5152.881: [GC (Allocation Failure) 
Desired survivor size 895483904 bytes, new threshold 4 (max 15)
[PSYoungGen: 1943531K->745977K(2050048K)] 6674344K->5504073K(7642624K), 0.0971717 secs] [Times: user=1.20 sys=0.67, real=0.10 secs] 
2016-07-12T12:00:57.881+0800: 5152.979: Total time for which application threads were stopped: 0.0977363 seconds, Stopping threads  took: 0.0000941 seconds
2016-07-12T12:00:59.406+0800: 5154.504: [GC (Allocation Failure) 
Desired survivor size 935329792 bytes, new threshold 5 (max 15)
[PSYoungGen: 2050041K->599188K(1715200K)] 6808137K->5647517K(7307776K), 0.3651465 secs] [Times: user=0.98 sys=5.88, real=0.37 secs] 
2016-07-12T12:00:59.772+0800: 5154.869: Total time for which application threads were stopped: 0.3656089 seconds, Stopping threads  took: 0.0000479 seconds
2016-07-12T12:01:00.968+0800: 5156.066: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 4 (max 15)
[PSYoungGen: 1568404K->697830K(1667072K)] 6616733K->5746159K(7259648K), 0.0978955 secs] [Times: user=1.91 sys=0.04, real=0.09 secs] 
2016-07-12T12:01:01.066+0800: 5156.164: Total time for which application threads were stopped: 0.0983759 seconds, Stopping threads  took: 0.0000482 seconds
2016-07-12T12:01:02.189+0800: 5157.287: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 3 (max 15)
[PSYoungGen: 1667046K->465454K(1864192K)] 6715375K->5855655K(7456768K), 0.1261993 secs] [Times: user=2.41 sys=0.29, real=0.12 secs] 
2016-07-12T12:01:02.316+0800: 5157.413: [Full GC (Ergonomics) [PSYoungGen: 465454K->65236K(1864192K)] [ParOldGen:   5390200K->5592328K(5592576K)] 5855655K->5657564K(7456768K), [Metaspace: 44635K->44635K(1089536K)], 3.2729437 secs] [Times: user=12.34   sys=57.11, real=3.28 secs] 
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 3.3998619 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 0.0002330 seconds, Stopping threads  took: 0.0000949 seconds
2016-07-12T12:01:05.688+0800: 5160.785: Total time for which application threads were stopped: 0.0002935 seconds, Stopping threads  took: 0.0000514 seconds
Heap
 PSYoungGen      total 1864192K, used 146620K [0x0000000715580000, 0x00000007c0000000, 0x00000007c0000000)
  eden space 932352K, 8% used [0x0000000715580000,0x000000071a4fa138,0x000000074e400000)
  from space 931840K, 7% used [0x0000000787200000,0x000000078b1b5290,0x00000007c0000000)
  to   space 931840K, 0% used [0x000000074e400000,0x000000074e400000,0x0000000787200000)
 ParOldGen       total 5592576K, used 5592328K [0x00000005c0000000, 0x0000000715580000, 0x0000000715580000)
  object space 5592576K, 99% used [0x00000005c0000000,0x00000007155420a8,0x0000000715580000)
 Metaspace       used 44654K, capacity 44990K, committed 45864K, reserved 1089536K
  class space    used 6212K, capacity 6324K, committed 6440K, reserved 1048576K 

Новая ошибка: я думаю, что это ошибка загрузки, которая вызывает проблему oom. Я хотел бы знать, как исправить эту ошибку загрузки?

    2016-07-15 11:41:47.307 [shuffle-client-0] ERROR [TransportChannelHandler.java:128] - Connection to     nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 has been quiet for 120000 ms while there are outstanding requests. Assuming  connection is dead; please adjust spark.network.timeout if this is wrong.
2016-07-15 11:41:47.309 [shuffle-client-0] ERROR [TransportResponseHandler.java:122] - Still have 1 requests outstanding when   connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 is closed
2016-07-15 11:41:47.314 [shuffle-client-0] ERROR [Logging.scala:95] - Error while uploading block input-0-1468553896200
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2016-07-15T11:41:47.316+0800: 2176.487: Total time for which application threads were stopped: 0.0002632 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-15 11:41:47.316 [Thread-5] WARN  [Logging.scala:91] - Failed to replicate input-0-1468553896200 to BlockManagerId(2,    nmg01-taihang-d10207.nmg01.baidu.com, 30456), failure #0
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
2016-07-15T11:41:48.316+0800: 2177.487: Total time for which application threads were stopped: 0.0003391 seconds, Stopping threads  took: 0.0000979 seconds
2016-07-15T11:41:51.312+0800: 2180.483: [GC (Allocation Failure) --[PSYoungGen: 2894863K->2894863K(3007488K)]   8299519K->9550273K(9998336K), 0.7462118 secs] [Times: user=9.78 sys=0.02, real=0.74 secs] 
2016-07-15T11:41:52.059+0800: 2181.230: [Full GC (Ergonomics) [PSYoungGen: 2894863K->0K(3007488K)] [ParOldGen:  6655410K->6895736K(6990848K)] 9550273K->6895736K(9998336K), [Metaspace: 44409K->44409K(1087488K)], 0.4061892 secs] [Times: user=7.50    sys=0.01, real=0.41 secs] 

1 ответ

Ваш код, похоже, имеет ошибку в структуре. В процессе просмотра вашего кода (чтобы сделать его заново, чтобы отразить структуру как опубликованную), я обнаружил, что ваш последний else if заявление:

} else if (os.matches("tizen(.*)")) {
    os = "tizen"

открывает блок, но не закрывает блок там, где он "должен". Вместо этого блок фактически заканчивается:

} catch {

Полный код, как кажется, был задуман (и переопределен):

val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
    i => ssc.networkStream(
        new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
    )
}
val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = {

    if (null == value || value.isEmpty) {
        return ""
    }
    var cuid = ""
    var os = ""
    var channel = ""
    var sv = ""
    var resid = ""
    var appid = ""
    var prod = ""
    try { //if exception,useless data,just drop it
        val timeStrIndex = value.indexOf(",\"time_str\"")
        var strAfterTruncation = ""
        if (-1 != timeStrIndex) {
            strAfterTruncation = value.substring(0,timeStrIndex) + "}"
        } else {
            strAfterTruncation = value
        }
        val jsonData = JSONObject.fromObject(strAfterTruncation)
        //val jsonData = value.getAsJsonArray()
        val binBody = jsonData.getString("bin_body")
        val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
        Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
            a.getKey() match {
                case "cuid" => cuid += a.getValue()
                case "os" => os += a.getValue()
                case "channel" => channel += a.getValue()
                case "sv" => sv += a.getValue()
                case "resid" => resid += a.getValue()
                case "appid" => appid += a.getValue()
                case "prod" => prod += a.getValue()
                case _ => null
            }
        )
        val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
        os = os.toLowerCase()
        if (os.matches("android(.*)")) {
            os = "android"
        } else if (os.matches("iphone(.*)")) {
            os = "iphone"
        } else if (os.matches("ipad(.*)")) {
            os = "ipad"
        } else if (os.matches("s60(.*)")) {
            os = "symbian"
        } else if (os.matches("wp7(.*)")) {
            os = "wp7"
        } else if (os.matches("wp(.*)")) {
            os = "wp"
        } else if (os.matches("tizen(.*)")) {
            os = "tizen"
        }

        val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
        val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
        if (ifHasLogid) {
            val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
            if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
                ""
            } else {
                decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
            }
        } else {
            ""
        }
    } catch {
        case _:Throwable => ""
    }
}
lines.map(parseData).print()

Я не проверял ваш код на функциональность. Это просто проблема синтаксиса / структуры, которая особенно заметна, когда вы очень кратко просматриваете код, который вы опубликовали.

Другие вопросы по тегам