Как использовать listen на basic.return в python-клиенте AMQP
Я хочу убедиться, что мое сообщение было доставлено в очередь.
Для этого я добавляю обязательный параметр в basic_publish. Что еще я должен сделать, чтобы получить basic.return
сообщение, если мое сообщение не было успешно доставлено?
Я не могу использовать channel.wait()
слушать за basic.return
потому что, когда мое сообщение успешно доставлено wait()
функция висит навсегда. (Тайм-аута нет) С другой стороны. Когда я не звоню channel.wait()
channel.returned_messages
останется пустым, даже если сообщение не доставлено.
я использую py-amqplib
версия 0.6.
Любое решение приветствуется.
3 ответа
В настоящее время это невозможно, так как basic.return
отправляется асинхронно, когда сообщение сбрасывается в брокере. Когда сообщение было успешно отправлено, с сервера не поступало никаких данных. Поэтому pyAMQP не может прослушивать такие сообщения.
Я прочитал несколько тем об этой проблеме. Возможные решения были:
- используйте txAMQP, витую версию amqp, которая обрабатывает basic.return
- используйте pyAMQP с ожиданием с таймаутом. (Я не уверен, что это возможно в настоящее время)
- часто пингуйте сервер с синхронными командами, чтобы pyAMQP мог выбирать
basic.return
сообщения, когда они приходят.
Поскольку уровень поддержки pyAMQP и rabbitMQ в целом довольно низок, мы решили вообще не использовать amqp broker.
Вы не можете сделать это синхронно, так как это асинхронная система. Но вы можете решить эту проблему с помощью потоков.
Основная идея заключается в том, что вы запускаете поток, который выполняет ожидание на канале, всякий раз, когда он выходит из ожидания, он вызывает функцию call_back для любого возвращенного сообщения в возвращенной очереди сообщений. Затем вы можете обработать это сообщение, как захотите, в функции call_back.
def registerCallback(channel, call_back): """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange. """ def wait(): try: channel.wait() except Exception, e: print("Problem waiting on publish channel: %s" % str(e)) while not channel.returned_messages.empty(): returnedMessage = channel.returned_messages.get() processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage)) processReturnedMessageThread.start() wait() waiting = Thread(target=wait) waiting.start()
Вы пробовали единственную библиотеку Python AMQP, которая в комплекте? Он не так широко используется, потому что он не аккуратно упакован.
Шаг 1. скомпилируйте библиотеку C - вам может понадобиться sudo apt-get install autotools-dev autoconf automake libtool
mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install
Шаг 2. Установите библиотеку Python
pip install pylibrabbitmq