Как ждать сообщения в нескольких очередях, используя py-amqplib
Я использую py-amqplib для доступа к RabbitMQ в Python. Приложение время от времени получает запросы на прослушивание определенных тем MQ.
При первом получении такого запроса он создает соединение AMQP и канал и запускает новый поток для прослушивания сообщений:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener очень прост:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
После создания соединения он подписывается на интересующую тему, например так:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
В первый раз все это работает нормально. Однако при последующем запросе подписаться на другую тему не удается. При последующих запросах я повторно использую соединение AMQP и поток AMQPListener (так как я не хочу начинать новый поток для каждой темы), и когда я вызываю блок кода выше, вызов метода channel.queue_declare() никогда не возвращается. Я также попытался создать новый канал в этот момент, и вызов connection.channel() также никогда не возвращается.
Единственный способ заставить его работать - это создать новое соединение, канал и поток слушателя по теме (т. Е. Routing_key), но это на самом деле не идеально. Я подозреваю, что это метод wait(), который каким-то образом блокирует все соединение, но я не уверен, что с этим делать. Конечно, я должен иметь возможность получать сообщения с несколькими ключами маршрутизации (или даже по нескольким каналам), используя один поток слушателя?
Смежный вопрос: как мне остановить поток слушателей, когда эта тема больше не представляет интереса? Вызов channel.wait() блокируется навсегда, если нет сообщений. Единственный способ, которым я могу придумать, - это отправить в очередь фиктивное сообщение, которое его "отравит", т.е. быть интерпретированным слушателем как сигнал к остановке.
1 ответ
Если вам нужно более одного потребителя на канал, просто подключите другой с помощью basic_consume() и используйте channel.wait() после. Он будет прослушивать все очереди, присоединенные через basic_consume(). Убедитесь, что вы определяете различные потребительские теги для каждого basic_consume().
Используйте channel.basic_cancel(consumer_tag), если вы хотите отменить определенного потребителя в очереди (отмена прослушивания определенной темы).