EOFException от Kafka в Flume
Я пытаюсь настроить простой конвейер данных от производителя консоли Kafka до файловой системы Hadoop (HDFS). Я работаю на 64-битной виртуальной машине Ubuntu и создал отдельных пользователей как для Hadoop, так и для Kafka, как было предложено в руководствах, которым я следовал. Использование полученного ввода в Kafka с консольным потребителем работает, и HDFS, похоже, работает.
Теперь я хочу использовать Flume для передачи данных в HDFS. Я использую следующий файл конфигурации:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 2000
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
Теперь, когда я запускаю Flume со следующей командой
bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1
Я получаю одно и то же исключение в выводе консоли снова и снова:
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529)
at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Единственный способ остановить Flume - убить процесс Java.
Я думал, что это может иметь какое-то отношение к отдельным пользователям для Hadoop и Kafka, но даже при запуске всего с пользователем Kafka я получаю тот же результат. Я также не нашел ничего о методе EOFException в сети, что странно, учитывая, что я только что следовал руководству "Приступая к работе" и использовал довольно стандартные конфигурации для всего.
Может быть, это как-то связано с предыдущей строкой ("Ubuntu-Sandbox/127.0.1.1 отключена") и, следовательно, с конфигурацией моей виртуальной машины?
Любая помощь высоко ценится!
1 ответ
Рассматривали ли вы использование Kafka Connect (часть Apache Kafka) и разъем HDFS вместо этого? Это обычно заменяет Flume. Он прост в использовании, с такой же файловой конфигурацией, что и Flume.