NoBrokersAvailable: ошибка NoBrokersAvailable-Kafka

Я уже начал изучать Кафку. Пробуем основные операции на нем. Я остановился на вопросе о "Брокерах".

Моя Кафка работает, но когда я хочу создать раздел.

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)

traceback (последний вызов был последним): файл "", строка 1, в файле "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", строка 284, в init self._client = KafkaClient (metrics = self._metrics, ** self.config) Файл "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", строка 202, в init self.config ['api_version '] = self.check_version (timeout = check_timeout) Файл "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", строка 791, в check_version вызывает Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable

7 ответов

У меня также была такая же ошибка во время потоковой передачи кафки. Ниже пример разрешил мою ошибку. Нам нужно определить версию API в KafkaProducer.

KafkaProducer(bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_serializer=lambda x:
dumps(x).encode('utf-8'))

Вы не можете создавать разделы внутри потребителя. Разделы создаются при создании темы. Например, используя инструмент командной строки:

bin/kafka-topics.sh \
  --zookeeper localhost:2181 \
  --create --topic myNewTopic \
  --partitions 10 \
  --replication-factor 3

Это создает новую тему "myNewTopic" с 10 разделами (пронумерованными от 0 до 9) и коэффициентом репликации 3. (см. http://docs.confluent.io/3.0.0/kafka/post-deployment.html и https://kafka.apache.org/documentation.html)

Внутри вашего потребителя, если вы позвоните assign(), это означает, что вы хотите использовать соответствующий раздел, и этот раздел уже должен существовать.

Для меня проблемой было правило брандмауэра, поскольку я запускаю Kafka в Google Cloud.

Вчера это сработало для меня, а сегодня я целый час чесал в затылке, думая, почему это больше не работает.

Поскольку общедоступный IP-адрес моей локальной системы меняется каждый раз, когда я подключаюсь к другой локальной сети или Wi-Fi, мне пришлось разрешить общедоступный IP-адрес моей локальной системы в правилах брандмауэра. Я бы предложил использовать соединение с фиксированным общедоступным IP-адресом или проверять это всякий раз, когда вы переключаете / меняете свое соединение.

Эти небольшие изменения в конфигурациях требуют слишком много времени для их отладки и исправления. Чувствовал себя потраченным на это час.

Не знаю, актуален ли этот ответ, но недавно решил эту же проблему в брокере виртуальных машин VBox, недоступном из хост-системы Windows. Поскольку вы упомянули bootsrap_servers в KafkaConsumer, я предполагаю, что вы используете как минимум kafka 0.10.0.0

Пожалуйста, посмотрите на advertised.listeners свойство в файле server.properties и установите его в PLAINTEXT://localhost:9092 или же PLAINTEXT://<broker_ip>:9092

Но перед тем как установить, убедитесь, что ваш брокер доступен из среды, в которой работает ваш потребитель (выполнив ping localhost).

Также вам нужно перезапустить kafka-сервер и потребителя / производителя (что бы ни работало) и попробовать отправить / получить.

Например, если вы используете виртуальную машину, вы можете использовать адаптер только для хоста, чтобы сделать посредника доступным с хоста

ПРИМЕЧАНИЕ. Эта конфигурация работает для Kafka Server >= 0.10.XX, но не для 0.8.2.X. Не проверено на 0.9.0.X

Похоже, вы хотите начать использовать сообщения вместо создания разделов. Тем не менее - вы можете добраться до Кафки в порту 1234? 9092 - порт кафки по умолчанию, может быть, вы можете попробовать этот. Если вы нашли правильный порт, но ваше приложение по-прежнему выдает ошибки, вы можете попробовать использовать консольный потребитель для проверки ваших настроек:

bin/kafka-console-producer.sh --broker-list localhost:<yourportnumber> --topic foobar

Потребитель консоли является частью стандартного дистрибутива kafka. Может быть, это немного приблизит вас к источнику проблемы.

Используйте 127.0.0.1 вместо localhost или любого другого IP-адреса, соответствующего вашему варианту использования. У меня сработало изменение localhost:9092 на 127.0.0.1:9092.

      from kafka import KafkaConsumer
consumer = KafkaConsumer('topicname',bootstrap_servers=['127.0.0.1:9092'])
print(consumer.config)
print(consumer.bootstrap_connected())

NoBrokersAvailable может быть ответом на неправильную конфигурацию имени хоста в конфигурации kafka.

Другие вопросы по тегам