Pika + RabbitMQ: установка basic_qos в prefetch=1 по-прежнему, по-видимому, использует все сообщения в очереди

У меня есть рабочий клиент Python, который раскручивает 10 рабочих, каждый из которых подключается к очереди RabbitMQ. Немного так:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

Проблема, с которой я столкнулся, заключается в том, что, несмотря на установку basic_qos на канале, первый рабочий, который запускает, принимает все сообщения из очереди, в то время как остальные сидят без дела. Я вижу это в интерфейсе rabbitmq, что даже когда я установил worker_count равным 1 и выкидывающим 50 сообщений в очередь, все 50 попадают в "неподтвержденную" корзину, тогда как я ожидаю, что 1 станет неподтвержденным, а остальные 49 будут готовы.

Почему это не работает?

1 ответ

Решение

Я, кажется, решил это, двигаясь где basic_qos называется.

Размещение сразу после channel = connection.channel() кажется, чтобы изменить поведение к тому, что я ожидал.

Другие вопросы по тегам