Кафка производят. Отправляет сообщение никогда
Я использую Kafka 2.12 и модуль kafka-python в качестве клиента Kafka. Я пытаюсь проверить простой производитель:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
Когда создается этот процесс, потребитель никогда не получает сообщение
Если я очищаю производителя и изменяю параметр linger_ms (делая его синхронизированным), сообщение отправляется и читается потребителем:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
В предыдущих версиях Kafka был параметр queue.buffering.max.ms, указывающий, как долго производитель будет ждать отправки сообщений в очереди, но он отсутствует в последней версии (kafka-python 1.3.3). Как я могу указать это в более новых версиях Kafka, чтобы сохранить мой коммуникатор асинхронным?
Спасибо!
3 ответа
Как вы заметили, сообщения помещаются в очередь для асинхронной отправки, и нет гарантии, что они будут отправлены немедленно. Поэтому, если вы хотите, чтобы сообщение было отправлено брокеру, вам нужно явно вызвать producer.flush()
который будет блокироваться, пока сообщение не будет отправлено (хотя flush()
не гарантирует acks).
Я уверен, что парам queue.buffering.max.ms
был заменен linger_ms
: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
Таким образом, вы уже используете этот параметр в своем рабочем примере.
producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
producer.send("topic_name", b'Your string here')
producer.flush()
Используйте send и flush.
Мы хотели убедиться, что наши сообщения отправляются быстро, поэтому мы просто добавили отдельный поток, который запускал цикл while, который ничего не делал, кроме вызова
producer.flush(timeout = 0.1)
и спать* в течение 100 мс.
Мы не хотели сводить на нет все преимущества пакетной обработки с точки зрения пропускной способности, но мы также хотели гарантировать, что при низком объеме трафика сообщения обрабатываются с минимальной задержкой (мс, а не минуты).
* Мы используем
gevent
. Сон может не понадобиться, если вы используете обычные
threading
.