Сообщения не доставляются с kafka python
Я пытаюсь настроить простое приложение Kafka с помощью kafka-python. Я пытался заставить некоторые примеры, которые я нашел в сети, работать, но, похоже, не смог этого сделать. У меня есть экземпляр kafka, работающий в контейнере Docker. Я протестировал инструменты оболочки, и экземпляр определенно работает. Я могу отправлять и получать сообщения. Я подозреваю, что время ожидания сообщений от производителя Вот две версии кода с одинаковым поведением:
import time
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'test'
producer.send_messages(topic, b'this is a message')
И вторая версия:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"
producer.send(topic, b'test message')
1 ответ
Измените строку:
producer.send(topic, b'test message')
Кому:producer.send(topic, b'test message').get(timeout=30)
(или любое другое значение, которое вы считаете нужным)
Проблема в том, что производитель удаляется до отправки сообщения, поскольку этот метод является асинхронным. Вы можете убедиться в этом сами, если добавите:
import logging
logging.basicConfig(level=logging.INFO)
И посмотрите, что тайм-аут равен 0.
Это зависит от того, как вы запустили Docker, но я считаю, что ваша проблема связана с именем хоста, к которому вы пытаетесь подключиться. Вы должны указать на хост, установленный в ADVERTISED_HOST
переменная окружения. Например, когда я запускаю kafka-docker как docker run --hostname kafka-1 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST='kafka-1' --env ADVERTISED_PORT=9092 spotify/kafka
Производю кафку вроде такой
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient('kafka-1:9092')
producer = SimpleProducer(kafka)
topic = 'test'
for i in range(100):
producer.send_messages(topic, 'hullo-' + str(i))
дополнительно мне нужно было добавить 127.0.0.1 kafka-1
к моему /etc/hosts
файл. После этого я смог использовать сообщения, созданные клиентом Python с bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning