Проблема с балансировкой при чтении сообщений в Кафке
Я пытаюсь читать сообщения на тему Кафки, но не могу прочитать. Процесс завершается через некоторое время, без чтения каких-либо сообщений.
Вот ошибка перебалансировки, которую я получаю:
[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer: (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages
Я пытался бежать ConsumerOffsetChecker
и это ошибка, которую я получаю. Я понятия не имею, как решить эту проблему. Кто-нибудь, есть идеи?
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group topic_group
Group Topic Pid Offset logSize Lag Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
... 16 more
5 ответов
У меня недавно были похожие проблемы. Вы можете попытаться увеличить потребительские конфигурации rebalance.backoff.ms и zookeeper.session.timeout.ms примерно до 5-10 секунд.
Первый параметр говорит кафке подождать больше, прежде чем пытаться перебалансировать. Второй говорит Кафке, чтобы он был более терпелив, пытаясь подключиться к зоопарку.
Другие параметры конфигурации можно найти в официальной документации.
Это, вероятно, означает, что брокеры не создали эти узлы правильно, когда он подключился к Zookeeper. Путь / потребитель должен существовать, когда вы пытаетесь потреблять.
Вот несколько путей для отладки:
Вы создали какие-либо темы?
Если так:
- Сколько разделов в теме?
- Вы проверяли, что метаданные темы были правильно заполнены в зоопарке?
- Можем ли мы увидеть вашу потребительскую конфигурацию?
Если не:
- Затем вам нужно создать тему, используя скрипт
$KAFKA_DIR/bin/kafka-create-topic.sh
, Загляните внутрь скрипта для деталей использования. - После того, как вы создали тему, вам нужно создать потребителя с идентификатором группы, который ранее не использовался, иначе вы не начнете с нуля.
В kafka.tools.ConsumerOffsetChecker есть ошибка. Если конкретный узел Zookeeper, содержащий информацию о смещении, не завершается, инструмент завершает работу, выдавая исключение.
Например, предположим, что у вас есть группа потребителей "mygroup" и тема "topictest". Затем смещение для раздела 2 сохраняется в Znode: / consumer / mygroup / offsets / topictest / 2.
Если нет записи для раздела 2 темы themetest в Znode, то инструмент коррекции потребителя завершит работу при проверке смещения для раздела 2. В основном, он потерпит неудачу при проверке первого раздела "n", для которого Znode / consumer / mygroup / offsets. / topictest / n отсутствует в Zookeeper.
Возможно, ваши брокеры не подключены к сети и не могут подключиться к Zookeeper. Вы пробовали запустить скрипт консоли-потребителя, доступный в $KAFKA_ROOT_DIR/bin
Путь для проверки, если вы можете использовать из определенной темы.
Другая проблема может быть из-за конфликтов с банками. Если у вас есть одна и та же банка с разными версиями, хранящиеся в папке библиотеки. Эта проблема может возникнуть. jars, такие как scala-library,zkclient, zookeeper, kafka-client, не должны дублироваться с различными версиями.