Сообщения не доставляются с kafka python

Я пытаюсь настроить простое приложение Kafka с помощью kafka-python. Я пытался заставить некоторые примеры, которые я нашел в сети, работать, но, похоже, не смог этого сделать. У меня есть экземпляр kafka, работающий в контейнере Docker. Я протестировал инструменты оболочки, и экземпляр определенно работает. Я могу отправлять и получать сообщения. Я подозреваю, что время ожидания сообщений от производителя Вот две версии кода с одинаковым поведением:

import time
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'test'
producer.send_messages(topic, b'this is a message')

И вторая версия:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"

producer.send(topic, b'test message')

1 ответ

Измените строку: producer.send(topic, b'test message')

Кому:producer.send(topic, b'test message').get(timeout=30) (или любое другое значение, которое вы считаете нужным)

Проблема в том, что производитель удаляется до отправки сообщения, поскольку этот метод является асинхронным. Вы можете убедиться в этом сами, если добавите:

import logging
logging.basicConfig(level=logging.INFO)

И посмотрите, что тайм-аут равен 0.

Это зависит от того, как вы запустили Docker, но я считаю, что ваша проблема связана с именем хоста, к которому вы пытаетесь подключиться. Вы должны указать на хост, установленный в ADVERTISED_HOST переменная окружения. Например, когда я запускаю kafka-docker как docker run --hostname kafka-1 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST='kafka-1' --env ADVERTISED_PORT=9092 spotify/kafka Производю кафку вроде такой

from kafka import SimpleProducer, KafkaClient

kafka = KafkaClient('kafka-1:9092')
producer = SimpleProducer(kafka)
topic = 'test'
for i in range(100):
    producer.send_messages(topic, 'hullo-' + str(i))

дополнительно мне нужно было добавить 127.0.0.1 kafka-1 к моему /etc/hosts файл. После этого я смог использовать сообщения, созданные клиентом Python с bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning

Другие вопросы по тегам