При использовании Pika BlockingConnection, нужно ли basic_ack() поместить в функцию обратного вызова

Скажем, я установил соединение с RabbitMQ следующим образом:

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
    callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )

чтобы добиться лучшего параллелизма, я попытался поместить каждое задание во внутреннюю очередь и создал цикл while для асинхронной отправки работника для каждого задания, полученного из этой внутренней очереди:

from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool

task_queue = Queue(10)
pool = Pool(20)

def worker(ch, method, job):
    # ...some heavy lifting...
     if job_gets_done:         # some abstraction
        print "job success"
        ch.basic_ack(delivery_tag=method.delivery_tag)   # PROBLEM : this seems not working
     else:
        print "job failed"

def callback(ch, method, properties, job):
     task_queue.put((ch,method,dn))     # put job in internal queue, block if full.

@threaded
def async_process_jobs():              # loop to get job and start thread worker.
    while True:
         params = task_queue.get()
         pool.apply_async( worker, params )   # param = (ch,method, job)


async_process_jobs()
channel.start_consuming()

проблема заключается в том, что когда задания обрабатываются, те, кто не выполняет их, отправляют подтверждение должным образом (даже если поток выполнения действительно проходит через него, то есть выдает "успех задания"). размер очереди на rabbitmq остается прежним, почему?

В некотором официальном уроке basic_ack() был помещен в callback(), а мой нет. Может ли это быть источником проблемы?


Подробное поведение (возможно, не важно): предположим, у меня в очереди 10000 заданий. Вначале около 2000 сообщений перешли в состояние Unacked, а затем все они возвращаются в состояние Ready, даже если мои работники все еще обрабатывают и печатают " работа удалась "(acking).

2 ответа

Решение

Из FAQ Пика:

Пика не имеет никакого понятия о потоке в коде. Если вы хотите использовать Pika с потоками, убедитесь, что у вас есть соединение Pika для каждого потока, созданного в этом потоке. Не безопасно разделять одно соединение Pika между потоками.

Я сталкиваюсь с подобной проблемой, я заметил, что: если работа выполнена быстро, то ack работает, но если работа стоит больше времени, то ack не работает, даже он отсылает.

Другие вопросы по тегам