Как ждать сообщения в нескольких очередях, используя py-amqplib

Я использую py-amqplib для доступа к RabbitMQ в Python. Приложение время от времени получает запросы на прослушивание определенных тем MQ.

При первом получении такого запроса он создает соединение AMQP и канал и запускает новый поток для прослушивания сообщений:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListener очень прост:

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

После создания соединения он подписывается на интересующую тему, например так:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

В первый раз все это работает нормально. Однако при последующем запросе подписаться на другую тему не удается. При последующих запросах я повторно использую соединение AMQP и поток AMQPListener (так как я не хочу начинать новый поток для каждой темы), и когда я вызываю блок кода выше, вызов метода channel.queue_declare() никогда не возвращается. Я также попытался создать новый канал в этот момент, и вызов connection.channel() также никогда не возвращается.

Единственный способ заставить его работать - это создать новое соединение, канал и поток слушателя по теме (т. Е. Routing_key), но это на самом деле не идеально. Я подозреваю, что это метод wait(), который каким-то образом блокирует все соединение, но я не уверен, что с этим делать. Конечно, я должен иметь возможность получать сообщения с несколькими ключами маршрутизации (или даже по нескольким каналам), используя один поток слушателя?

Смежный вопрос: как мне остановить поток слушателей, когда эта тема больше не представляет интереса? Вызов channel.wait() блокируется навсегда, если нет сообщений. Единственный способ, которым я могу придумать, - это отправить в очередь фиктивное сообщение, которое его "отравит", т.е. быть интерпретированным слушателем как сигнал к остановке.

1 ответ

Если вам нужно более одного потребителя на канал, просто подключите другой с помощью basic_consume() и используйте channel.wait() после. Он будет прослушивать все очереди, присоединенные через basic_consume(). Убедитесь, что вы определяете различные потребительские теги для каждого basic_consume().

Используйте channel.basic_cancel(consumer_tag), если вы хотите отменить определенного потребителя в очереди (отмена прослушивания определенной темы).

Другие вопросы по тегам