org.apache.kafka.common.network.InvalidReceiveException: недействительное получение (размер = 30662099 больше, чем 30662028)
Я пытаюсь перенести данные из каналов Flume в кластер Kafka с помощью приемника Kafka, и я могу видеть связанные данные в связанной теме, но одновременно я наблюдаю нижеприведенную трассировку исключений в журналах Kafka слишком часто,
[2017-03-21 16:47:56,250] WARN Unexpected error from /10.X.X.X; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
Первоначальный анализ привел меня к моим журналам Flume и обнаружил следы исключений в нем,
21 Mar 2017 16:25:32,560 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:229)
... 3 more
из первой трассировки стека кажется, что Flume пытается выдвинуть данные размером 30662099 байт, но предел приема сообщений MSG брокера Kafka ограничен 30662028 байт.
я сохранил одинаковый размер отправляемых и получаемых сообщений для производителя (Flume) и брокера (Kafka), т. е. 30662028, меня беспокоит, если мой Flume отправляет только 30662028 байт, то что это за дополнительные байты, которые накапливаются с сообщением моих производителей и формируют последнее сообщение размером 30662099, которое вызывает удаление этого сообщения.
Любая помощь будет действительно заметна!