Как обнаружить обмен не существует с py-amqp
Я хочу иметь возможность определить, существует ли обмен при отправке сообщения в AMQP.
Рассмотрим следующий пример.
#!/usr/bin/python
import amqp
from time import sleep
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")
while True:
print "publish message."
outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
sleep(1)
Этот скрипт продолжает публикацию на бирже, но не выдает никаких ошибок, если биржа не существует. Когда обмен существует, приходят сообщения.
#!/usr/bin/python
import amqp
from time import sleep
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")
while True:
print "publish message."
outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
outgoing.wait()
sleep(1)
Когда я добавляю outgoing.wait(), вызывается amqp.exceptions.NotFound, чего я и хочу. Проблема, однако, в том, что если в этом случае обмен существует, сообщение приходит, но outgoing.wait() блокирует мой цикл. (Я мог бы запустить outgoing.wait() в отдельном потоке, но я этого не хочу.)
Как с этим бороться?
Любые советы советы указатели приветствуются
Спасибо,
сойка
2 ответа
Если вы хотите узнать, существует ли обмен, используйте метод exchange_declare и установите для пассивного флага значение True. Установка для пассивного флага значения True предотвратит попытку сервера создать обмен и вместо этого выдаст ошибку, если обмен не существует.
import amqp
from amqp.exceptions import NotFound
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
virtual_host="/")
outgoing = conn.channel()
try:
outgoing.exchange_declare("fubar", "", passive=True)
except NotFound:
print "Exchange 'fubar' does not exist!"
Если вы действительно заинтересованы в том, чтобы обмен существовал до его публикации, просто объявите его, прежде чем переходить в цикл отправки. Если обмен уже существует, ничего не произойдет. Если обмен не существует, он будет создан.
import amqp
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
virtual_host="/")
outgoing = conn.channel()
outgoing.exchange_declare("fubar", "direct")
Вот ссылка на объявление метода для exchange_declare в используемой вами библиотеке amqp:
https://github.com/celery/py-amqp/blob/master/amqp/channel.py
К сожалению, вам нужен блокирующий вызов для проверки исключений из basic_publish(). Однако вы можете выполнить блокирующий вызов один раз перед входом в асинхронный цикл:
# send a test message synchronously to see if the exchange exists
test_message = amqp.Message('exchange_test')
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar")
try:
outgoing.wait()
except amqp.exceptions.NotFound:
# could not find the exchange, so do something about it
exit()
while True:
# fairly certain the exchange exists now, run the async loop
print "publish message."
outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
sleep(1)