Как использовать listen на basic.return в python-клиенте AMQP

Я хочу убедиться, что мое сообщение было доставлено в очередь.

Для этого я добавляю обязательный параметр в basic_publish. Что еще я должен сделать, чтобы получить basic.return сообщение, если мое сообщение не было успешно доставлено?

Я не могу использовать channel.wait() слушать за basic.return потому что, когда мое сообщение успешно доставлено wait() функция висит навсегда. (Тайм-аута нет) С другой стороны. Когда я не звоню channel.wait() channel.returned_messages останется пустым, даже если сообщение не доставлено.

я использую py-amqplib версия 0.6.

Любое решение приветствуется.

3 ответа

Решение

В настоящее время это невозможно, так как basic.return отправляется асинхронно, когда сообщение сбрасывается в брокере. Когда сообщение было успешно отправлено, с сервера не поступало никаких данных. Поэтому pyAMQP не может прослушивать такие сообщения.

Я прочитал несколько тем об этой проблеме. Возможные решения были:

  • используйте txAMQP, витую версию amqp, которая обрабатывает basic.return
  • используйте pyAMQP с ожиданием с таймаутом. (Я не уверен, что это возможно в настоящее время)
  • часто пингуйте сервер с синхронными командами, чтобы pyAMQP мог выбирать basic.return сообщения, когда они приходят.

Поскольку уровень поддержки pyAMQP и rabbitMQ в целом довольно низок, мы решили вообще не использовать amqp broker.

Вы не можете сделать это синхронно, так как это асинхронная система. Но вы можете решить эту проблему с помощью потоков.

Основная идея заключается в том, что вы запускаете поток, который выполняет ожидание на канале, всякий раз, когда он выходит из ожидания, он вызывает функцию call_back для любого возвращенного сообщения в возвращенной очереди сообщений. Затем вы можете обработать это сообщение, как захотите, в функции call_back.

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()

Вы пробовали единственную библиотеку Python AMQP, которая в комплекте? Он не так широко используется, потому что он не аккуратно упакован.

Шаг 1. скомпилируйте библиотеку C - вам может понадобиться sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

Шаг 2. Установите библиотеку Python

pip install pylibrabbitmq
Другие вопросы по тегам