Pika + RabbitMQ: установка basic_qos в prefetch=1 по-прежнему, по-видимому, использует все сообщения в очереди
У меня есть рабочий клиент Python, который раскручивает 10 рабочих, каждый из которых подключается к очереди RabbitMQ. Немного так:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
Проблема, с которой я столкнулся, заключается в том, что, несмотря на установку basic_qos на канале, первый рабочий, который запускает, принимает все сообщения из очереди, в то время как остальные сидят без дела. Я вижу это в интерфейсе rabbitmq, что даже когда я установил worker_count
равным 1 и выкидывающим 50 сообщений в очередь, все 50 попадают в "неподтвержденную" корзину, тогда как я ожидаю, что 1 станет неподтвержденным, а остальные 49 будут готовы.
Почему это не работает?
1 ответ
Я, кажется, решил это, двигаясь где basic_qos
называется.
Размещение сразу после channel = connection.channel()
кажется, чтобы изменить поведение к тому, что я ожидал.