Проблема с балансировкой при чтении сообщений в Кафке

Я пытаюсь читать сообщения на тему Кафки, но не могу прочитать. Процесс завершается через некоторое время, без чтения каких-либо сообщений.

Вот ошибка перебалансировки, которую я получаю:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

Я пытался бежать ConsumerOffsetCheckerи это ошибка, которую я получаю. Я понятия не имею, как решить эту проблему. Кто-нибудь, есть идеи?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more

5 ответов

Решение

У меня недавно были похожие проблемы. Вы можете попытаться увеличить потребительские конфигурации rebalance.backoff.ms и zookeeper.session.timeout.ms примерно до 5-10 секунд.

Первый параметр говорит кафке подождать больше, прежде чем пытаться перебалансировать. Второй говорит Кафке, чтобы он был более терпелив, пытаясь подключиться к зоопарку.

Другие параметры конфигурации можно найти в официальной документации.

Это, вероятно, означает, что брокеры не создали эти узлы правильно, когда он подключился к Zookeeper. Путь / потребитель должен существовать, когда вы пытаетесь потреблять.

Вот несколько путей для отладки:

Вы создали какие-либо темы?

Если так:

  1. Сколько разделов в теме?
  2. Вы проверяли, что метаданные темы были правильно заполнены в зоопарке?
  3. Можем ли мы увидеть вашу потребительскую конфигурацию?

Если не:

  1. Затем вам нужно создать тему, используя скрипт $KAFKA_DIR/bin/kafka-create-topic.sh, Загляните внутрь скрипта для деталей использования.
  2. После того, как вы создали тему, вам нужно создать потребителя с идентификатором группы, который ранее не использовался, иначе вы не начнете с нуля.

В kafka.tools.ConsumerOffsetChecker есть ошибка. Если конкретный узел Zookeeper, содержащий информацию о смещении, не завершается, инструмент завершает работу, выдавая исключение.

Например, предположим, что у вас есть группа потребителей "mygroup" и тема "topictest". Затем смещение для раздела 2 сохраняется в Znode: / consumer / mygroup / offsets / topictest / 2.

Если нет записи для раздела 2 темы themetest в Znode, то инструмент коррекции потребителя завершит работу при проверке смещения для раздела 2. В основном, он потерпит неудачу при проверке первого раздела "n", для которого Znode / consumer / mygroup / offsets. / topictest / n отсутствует в Zookeeper.

Возможно, ваши брокеры не подключены к сети и не могут подключиться к Zookeeper. Вы пробовали запустить скрипт консоли-потребителя, доступный в $KAFKA_ROOT_DIR/bin Путь для проверки, если вы можете использовать из определенной темы.

Другая проблема может быть из-за конфликтов с банками. Если у вас есть одна и та же банка с разными версиями, хранящиеся в папке библиотеки. Эта проблема может возникнуть. jars, такие как scala-library,zkclient, zookeeper, kafka-client, не должны дублироваться с различными версиями.

Другие вопросы по тегам