Как обнаружить обмен не существует с py-amqp

Я хочу иметь возможность определить, существует ли обмен при отправке сообщения в AMQP.

Рассмотрим следующий пример.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)

Этот скрипт продолжает публикацию на бирже, но не выдает никаких ошибок, если биржа не существует. Когда обмен существует, приходят сообщения.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    outgoing.wait()
    sleep(1)

Когда я добавляю outgoing.wait(), вызывается amqp.exceptions.NotFound, чего я и хочу. Проблема, однако, в том, что если в этом случае обмен существует, сообщение приходит, но outgoing.wait() блокирует мой цикл. (Я мог бы запустить outgoing.wait() в отдельном потоке, но я этого не хочу.)

Как с этим бороться?

Любые советы советы указатели приветствуются

Спасибо,

сойка

2 ответа

Если вы хотите узнать, существует ли обмен, используйте метод exchange_declare и установите для пассивного флага значение True. Установка для пассивного флага значения True предотвратит попытку сервера создать обмен и вместо этого выдаст ошибку, если обмен не существует.

import amqp
from amqp.exceptions import NotFound

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
try:
    outgoing.exchange_declare("fubar", "", passive=True)
except NotFound:
    print "Exchange 'fubar' does not exist!"

Если вы действительно заинтересованы в том, чтобы обмен существовал до его публикации, просто объявите его, прежде чем переходить в цикл отправки. Если обмен уже существует, ничего не произойдет. Если обмен не существует, он будет создан.

import amqp

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
outgoing.exchange_declare("fubar", "direct")

Вот ссылка на объявление метода для exchange_declare в используемой вами библиотеке amqp:

https://github.com/celery/py-amqp/blob/master/amqp/channel.py

К сожалению, вам нужен блокирующий вызов для проверки исключений из basic_publish(). Однако вы можете выполнить блокирующий вызов один раз перед входом в асинхронный цикл:

# send a test message synchronously to see if the exchange exists
test_message = amqp.Message('exchange_test')
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar")
try:    
    outgoing.wait()
except amqp.exceptions.NotFound:
    # could not find the exchange, so do something about it
    exit()

while True:
    # fairly certain the exchange exists now, run the async loop
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)
Другие вопросы по тегам