Искровой потоковый приемник Out of Memory (OOM)
У меня проблема, когда ресивер перезагружается снова и снова.
Я использую Spark 1.6.1. Я использую Spark Streaming для получения потоковой передачи, затем использую map для десериализации данных pb.
Мое тестирование содержит два случая:
- Просто получите данные и распечатайте напрямую: приложение стабильно
- Получите и десериализуйте: это создает проблемы. Время появления не является регулярным. Есть 500Mb/ мин. Я установил память исполнителя на 8 ГБ. Проблема в том, как что-то чрезвычайно выделяет память. Но я не знаю почему.
Мой код:
val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
i => ssc.networkStream(
new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
)
}
val lines = ssc.union(bigPipeStreams)
def deserializePbData(value: String) : String = {
if (null == value || value.isEmpty) {
return ""
}
var cuid = ""
var os = ""
var channel = ""
var sv = ""
var resid = ""
var appid = ""
var prod = ""
try { //if exception,useless data,just drop it
val timeStrIndex = value.indexOf(",\"time_str\"")
var strAfterTruncation = ""
if (-1 != timeStrIndex) {
strAfterTruncation = value.substring(0,timeStrIndex) + "}"
} else {
strAfterTruncation = value
}
val jsonData = JSONObject.fromObject(strAfterTruncation)
//val jsonData = value.getAsJsonArray()
val binBody = jsonData.getString("bin_body")
val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a =>
a.getKey() match {
case "cuid" => cuid += a.getValue()
case "os" => os += a.getValue()
case "channel" => channel += a.getValue()
case "sv" => sv += a.getValue()
case "resid" => resid += a.getValue()
case "appid" => appid += a.getValue()
case "prod" => prod += a.getValue()
case _ => null
}
)
val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
os = os.toLowerCase()
if (os.matches("android(.*)")) {
os = "android"
} else if (os.matches("iphone(.*)")) {
os = "iphone"
} else if (os.matches("ipad(.*)")) {
os = "ipad"
} else if (os.matches("s60(.*)")) {
os = "symbian"
} else if (os.matches("wp7(.*)")) {
os = "wp7"
} else if (os.matches("wp(.*)")) {
os = "wp"
} else if (os.matches("tizen(.*)")) {
os = "tizen"
val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
if (ifHasLogid) {
val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
""
} else {
decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
}
} else {
""
}
} catch {
case _:Throwable => ""
}
}
lines.map(parseData).print()
Текст ошибки:
016-07-12T12:00:01.546+0800: 5096.643: [GC (Allocation Failure)
Desired survivor size 442499072 bytes, new threshold 1 (max 15)
[PSYoungGen: 0K->0K(2356736K)] 5059009K->5059009K(7949312K), 0.0103342 secs] [Times: user=0.21 sys=0.00, real=0.01 secs]
2016-07-12T12:00:01.556+0800: 5096.654: [Full GC (Allocation Failure) [PSYoungGen: 0K->0K(2356736K)] [ParOldGen: 5059009K->5057376K(5592576K)] 5059009K->5057376K(7949312K), [Metaspace: 44836K->44490K(1089536K)], 0.8769617 secs] [Times: user=17.88 sys=0.04, real=0.88 secs]
2016-07-12T12:00:02.434+0800: 5097.531: Total time for which application threads were stopped: 1.2951974 seconds, Stopping threads took: 0.0000662 seconds
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid24310.hprof ...
2016-07-12T12:00:30.960+0800: 5126.057: Total time for which application threads were stopped: 28.5260812 seconds, Stopping threads took: 0.0000995 seconds
Heap dump file created [5211252802 bytes in 28.526 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 24310"...
2016-07-12T12:00:31.589+0800: 5126.686: Total time for which application threads were stopped: 0.6289627 seconds, Stopping threads took: 0.0001258 seconds
2016-07-12T12:00:31.595+0800: 5126.692: Total time for which application threads were stopped: 0.0004822 seconds, Stopping threads took: 0.0001493 seconds
2016-07-12 12:00:31.597 [Thread-5] ERROR [Logging.scala:95] - Uncaught exception in thread Thread[Thread-5,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_51]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[na:1.8.0_51]
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_51]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_51]
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_51]
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) ~[na:1.8.0_51]
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.io.Output.require(Output.java:135) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1202) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:858) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:157) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:128) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:109) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:296) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks( BlockGenerator.scala:268) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:109) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
2016-07-12 12:00:31.600 [SIGTERM handler] ERROR [SignalLogger.scala:57] - RECEIVED SIGNAL 15: SIGTERM
2016-07-12T12:00:31.611+0800: 5126.708: Total time for which application threads were stopped: 0.0005602 seconds, Stopping threads took: 0.0001765 seconds
2016-07-12T12:00:31.617+0800: 5126.714: Total time for which application threads were stopped: 0.0004800 seconds, Stopping threads took: 0.0001412 seconds
2016-07-12 12:00:32.483 [Bigpipe Receiver-SendThread(cq01-bigpipe-proxy01.cq01.baidu.com:2181)] WARN [ClientCnxnSocket.java:139] - Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:32.507+0800: 5127.604: Total time for which application threads were stopped: 0.0004604 seconds, Stopping threads took: 0.0001198 seconds
2016-07-12T12:00:32.509+0800: 5127.606: Total time for which application threads were stopped: 0.0002919 seconds, Stopping threads took: 0.0001800 seconds
2016-07-12T12:00:32.509+0800: 5127.607: Total time for which application threads were stopped: 0.0002692 seconds, Stopping threads took: 0.0001612 seconds
2016-07-12 12:00:32.549 [Bigpipe Receiver-SendThread(tc-bigpipe-proxy03.tc.baidu.com:2181)] WARN [ClientCnxnSocket.java:139] - Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:34.220+0800: 5129.317: [GC (Allocation Failure)
Desired survivor size 424148992 bytes, new threshold 2 (max 15)
[PSYoungGen: 1931776K->188775K(2363904K)] 6989152K->5246152K(7956480K), 0.2569385 secs] [Times: user=0.00 sys=5.19, real=0.26 secs]
2016-07-12T12:00:34.477+0800: 5129.575: Total time for which application threads were stopped: 0.2575019 seconds, Stopping threads took: 0.0000384 seconds
2016-07-12T12:00:35.478+0800: 5130.575: Total time for which application threads were stopped: 0.0002786 seconds, Stopping threads took: 0.0000424 seconds
2016-07-12T12:00:37.600+0800: 5132.697: [GC (Allocation Failure)
Desired survivor size 482344960 bytes, new threshold 3 (max 15)
[PSYoungGen: 2120551K->387013K(2268160K)] 7177928K->5444389K(7860736K), 0.5153031 secs] [Times: user=0.00 sys=9.89, real=0.52 secs]
2016-07-12T12:00:38.116+0800: 5133.213: Total time for which application threads were stopped: 0.5157529 seconds, Stopping threads took: 0.0000427 seconds
2016-07-12T12:00:40.116+0800: 5135.213: Total time for which application threads were stopped: 0.0003171 seconds, Stopping threads took: 0.0001000 seconds
2016-07-12T12:00:40.419+0800: 5135.516: [GC (Allocation Failure)
Desired survivor size 599785472 bytes, new threshold 2 (max 15)
[PSYoungGen: 2240965K->471033K(2324992K)] 7298341K->5633517K(7917568K), 0.3621433 secs] [Times: user=0.12 sys=7.11, real=0.36 secs]
2016-07-12T12:00:40.781+0800: 5135.878: Total time for which application threads were stopped: 0.3626080 seconds, Stopping threads took: 0.0000429 seconds
2016-07-12T12:00:41.781+0800: 5136.879: Total time for which application threads were stopped: 0.0003301 seconds, Stopping threads took: 0.0000947 seconds
2016-07-12T12:00:43.108+0800: 5138.205: [GC (Allocation Failure)
Desired survivor size 620756992 bytes, new threshold 3 (max 15)
[PSYoungGen: 2324985K->378481K(2054656K)] 7487469K->5831048K(7647232K), 0.2593685 secs] [Times: user=0.66 sys=4.96, real=0.26 secs]
2016-07-12T12:00:43.368+0800: 5138.465: [Full GC (Ergonomics) [PSYoungGen: 378481K->0K(2054656K)] [ParOldGen: 5452566K->4713601K(5592576K)] 5831048K->4713601K(7647232K), [Metaspace: 44635K->44635K(1089536K)], 4.3137405 secs] [Times: user=9.78 sys=74.53, real=4.31 secs]
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 4.5736603 seconds, Stopping threads took: 0.0000449 seconds
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 0.0002430 seconds, Stopping threads took: 0.0000856 seconds
2016-07-12T12:00:49.954+0800: 5145.052: [GC (Allocation Failure)
Desired survivor size 597688320 bytes, new threshold 4 (max 15)
[PSYoungGen: 1583616K->161266K(2189824K)] 6297217K->4874867K(7782400K), 0.0388138 secs] [Times: user=0.00 sys=0.84, real=0.04 secs]
2016-07-12T12:00:49.993+0800: 5145.091: Total time for which application threads were stopped: 0.0392926 seconds, Stopping threads took: 0.0000449 seconds
2016-07-12T12:00:51.903+0800: 5147.000: [GC (Allocation Failure)
Desired survivor size 596115456 bytes, new threshold 5 (max 15)
[PSYoungGen: 1744882K->324587K(2213888K)] 6458483K->5038189K(7806464K), 0.0334029 secs] [Times: user=0.69 sys=0.03, real=0.04 secs]
2016-07-12T12:00:51.936+0800: 5147.034: Total time for which application threads were stopped: 0.0338707 seconds, Stopping threads took: 0.0000404 seconds
2016-07-12T12:00:53.942+0800: 5149.039: [GC (Allocation Failure)
Desired survivor size 654835712 bytes, new threshold 6 (max 15)
[PSYoungGen: 1954795K->490438K(2120704K)] 6668397K->5204039K(7713280K), 0.0441762 secs] [Times: user=0.95 sys=0.02, real=0.05 secs]
2016-07-12T12:00:53.986+0800: 5149.083: Total time for which application threads were stopped: 0.0446174 seconds, Stopping threads took: 0.0000456 seconds
2016-07-12T12:00:56.102+0800: 5151.199: [GC (Allocation Failure)
Desired survivor size 763887616 bytes, new threshold 5 (max 15)
[PSYoungGen: 2120646K->639467K(1943552K)] 6834247K->5370280K(7536128K), 0.1124828 secs] [Times: user=1.07 sys=1.30, real=0.11 secs]
2016-07-12T12:00:56.214+0800: 5151.312: Total time for which application threads were stopped: 0.1129348 seconds, Stopping threads took: 0.0000396 seconds
2016-07-12T12:00:57.784+0800: 5152.881: [GC (Allocation Failure)
Desired survivor size 895483904 bytes, new threshold 4 (max 15)
[PSYoungGen: 1943531K->745977K(2050048K)] 6674344K->5504073K(7642624K), 0.0971717 secs] [Times: user=1.20 sys=0.67, real=0.10 secs]
2016-07-12T12:00:57.881+0800: 5152.979: Total time for which application threads were stopped: 0.0977363 seconds, Stopping threads took: 0.0000941 seconds
2016-07-12T12:00:59.406+0800: 5154.504: [GC (Allocation Failure)
Desired survivor size 935329792 bytes, new threshold 5 (max 15)
[PSYoungGen: 2050041K->599188K(1715200K)] 6808137K->5647517K(7307776K), 0.3651465 secs] [Times: user=0.98 sys=5.88, real=0.37 secs]
2016-07-12T12:00:59.772+0800: 5154.869: Total time for which application threads were stopped: 0.3656089 seconds, Stopping threads took: 0.0000479 seconds
2016-07-12T12:01:00.968+0800: 5156.066: [GC (Allocation Failure)
Desired survivor size 954204160 bytes, new threshold 4 (max 15)
[PSYoungGen: 1568404K->697830K(1667072K)] 6616733K->5746159K(7259648K), 0.0978955 secs] [Times: user=1.91 sys=0.04, real=0.09 secs]
2016-07-12T12:01:01.066+0800: 5156.164: Total time for which application threads were stopped: 0.0983759 seconds, Stopping threads took: 0.0000482 seconds
2016-07-12T12:01:02.189+0800: 5157.287: [GC (Allocation Failure)
Desired survivor size 954204160 bytes, new threshold 3 (max 15)
[PSYoungGen: 1667046K->465454K(1864192K)] 6715375K->5855655K(7456768K), 0.1261993 secs] [Times: user=2.41 sys=0.29, real=0.12 secs]
2016-07-12T12:01:02.316+0800: 5157.413: [Full GC (Ergonomics) [PSYoungGen: 465454K->65236K(1864192K)] [ParOldGen: 5390200K->5592328K(5592576K)] 5855655K->5657564K(7456768K), [Metaspace: 44635K->44635K(1089536K)], 3.2729437 secs] [Times: user=12.34 sys=57.11, real=3.28 secs]
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 3.3998619 seconds, Stopping threads took: 0.0000521 seconds
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 0.0002330 seconds, Stopping threads took: 0.0000949 seconds
2016-07-12T12:01:05.688+0800: 5160.785: Total time for which application threads were stopped: 0.0002935 seconds, Stopping threads took: 0.0000514 seconds
Heap
PSYoungGen total 1864192K, used 146620K [0x0000000715580000, 0x00000007c0000000, 0x00000007c0000000)
eden space 932352K, 8% used [0x0000000715580000,0x000000071a4fa138,0x000000074e400000)
from space 931840K, 7% used [0x0000000787200000,0x000000078b1b5290,0x00000007c0000000)
to space 931840K, 0% used [0x000000074e400000,0x000000074e400000,0x0000000787200000)
ParOldGen total 5592576K, used 5592328K [0x00000005c0000000, 0x0000000715580000, 0x0000000715580000)
object space 5592576K, 99% used [0x00000005c0000000,0x00000007155420a8,0x0000000715580000)
Metaspace used 44654K, capacity 44990K, committed 45864K, reserved 1089536K
class space used 6212K, capacity 6324K, committed 6440K, reserved 1048576K
Новая ошибка: я думаю, что это ошибка загрузки, которая вызывает проблему oom. Я хотел бы знать, как исправить эту ошибку загрузки?
2016-07-15 11:41:47.307 [shuffle-client-0] ERROR [TransportChannelHandler.java:128] - Connection to nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
2016-07-15 11:41:47.309 [shuffle-client-0] ERROR [TransportResponseHandler.java:122] - Still have 1 requests outstanding when connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 is closed
2016-07-15 11:41:47.314 [shuffle-client-0] ERROR [Logging.scala:95] - Error while uploading block input-0-1468553896200
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2016-07-15T11:41:47.316+0800: 2176.487: Total time for which application threads were stopped: 0.0002632 seconds, Stopping threads took: 0.0000521 seconds
2016-07-15 11:41:47.316 [Thread-5] WARN [Logging.scala:91] - Failed to replicate input-0-1468553896200 to BlockManagerId(2, nmg01-taihang-d10207.nmg01.baidu.com, 30456), failure #0
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
2016-07-15T11:41:48.316+0800: 2177.487: Total time for which application threads were stopped: 0.0003391 seconds, Stopping threads took: 0.0000979 seconds
2016-07-15T11:41:51.312+0800: 2180.483: [GC (Allocation Failure) --[PSYoungGen: 2894863K->2894863K(3007488K)] 8299519K->9550273K(9998336K), 0.7462118 secs] [Times: user=9.78 sys=0.02, real=0.74 secs]
2016-07-15T11:41:52.059+0800: 2181.230: [Full GC (Ergonomics) [PSYoungGen: 2894863K->0K(3007488K)] [ParOldGen: 6655410K->6895736K(6990848K)] 9550273K->6895736K(9998336K), [Metaspace: 44409K->44409K(1087488K)], 0.4061892 secs] [Times: user=7.50 sys=0.01, real=0.41 secs]
1 ответ
Ваш код, похоже, имеет ошибку в структуре. В процессе просмотра вашего кода (чтобы сделать его заново, чтобы отразить структуру как опубликованную), я обнаружил, что ваш последний else if
заявление:
} else if (os.matches("tizen(.*)")) {
os = "tizen"
открывает блок, но не закрывает блок там, где он "должен". Вместо этого блок фактически заканчивается:
} catch {
Полный код, как кажется, был задуман (и переопределен):
val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
i => ssc.networkStream(
new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
)
}
val lines = ssc.union(bigPipeStreams)
def deserializePbData(value: String) : String = {
if (null == value || value.isEmpty) {
return ""
}
var cuid = ""
var os = ""
var channel = ""
var sv = ""
var resid = ""
var appid = ""
var prod = ""
try { //if exception,useless data,just drop it
val timeStrIndex = value.indexOf(",\"time_str\"")
var strAfterTruncation = ""
if (-1 != timeStrIndex) {
strAfterTruncation = value.substring(0,timeStrIndex) + "}"
} else {
strAfterTruncation = value
}
val jsonData = JSONObject.fromObject(strAfterTruncation)
//val jsonData = value.getAsJsonArray()
val binBody = jsonData.getString("bin_body")
val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a =>
a.getKey() match {
case "cuid" => cuid += a.getValue()
case "os" => os += a.getValue()
case "channel" => channel += a.getValue()
case "sv" => sv += a.getValue()
case "resid" => resid += a.getValue()
case "appid" => appid += a.getValue()
case "prod" => prod += a.getValue()
case _ => null
}
)
val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
os = os.toLowerCase()
if (os.matches("android(.*)")) {
os = "android"
} else if (os.matches("iphone(.*)")) {
os = "iphone"
} else if (os.matches("ipad(.*)")) {
os = "ipad"
} else if (os.matches("s60(.*)")) {
os = "symbian"
} else if (os.matches("wp7(.*)")) {
os = "wp7"
} else if (os.matches("wp(.*)")) {
os = "wp"
} else if (os.matches("tizen(.*)")) {
os = "tizen"
}
val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
if (ifHasLogid) {
val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
""
} else {
decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
}
} else {
""
}
} catch {
case _:Throwable => ""
}
}
lines.map(parseData).print()
Я не проверял ваш код на функциональность. Это просто проблема синтаксиса / структуры, которая особенно заметна, когда вы очень кратко просматриваете код, который вы опубликовали.