Pika basic_publish зависает при публикации в нескольких очередях

Мне нужно настроить несколько очередей на обмен. Я хотел бы создать одно соединение, затем объявить несколько очередей (это работает), а затем опубликовать сообщения в нескольких очередях (это не работает).

Для этого я настроил некоторый тестовый код, но каждый раз он зависает на второй публикации. Я думаю, что это не нравится публикация в нескольких очередях без закрытия соединения, так как этот код работает, когда я публикую в одной очереди (даже несколько сообщений в одной очереди).

Что-то, что я должен добавить, чтобы сделать эту работу? Я действительно хотел бы не закрывать связь между публикациями. Кроме того, когда мои клиенты активны, они ничего не видят, когда я отправляю функции basic_publish() при отправке в несколько очередей. Я вижу, что сообщения появляются почти мгновенно, когда я публикую в одной очереди.

#!/usr/bin/env python
import pika


queue_names = ['1a', '2b', '3c', '4d']


# Variables to hold our connection and channel
connection = None
channel = None


# Called when our connection to RabbitMQ is closed
def on_closed(frame):
    global connection
    # connection.ioloop is blocking, this will stop and exit the app
    connection.ioloop.stop()



def on_connected(connection):
    """
    Called when we have connected to RabbitMQ
    This creates a channel on the connection
    """
    global channel #TODO: Test removing this global call

    connection.add_on_close_callback(on_closed)

    # Create a channel on our connection passing the on_channel_open callback
    connection.channel(on_channel_open)



def on_channel_open(channel_):
    """
    Called when channel opened
    Declare a queue on the channel
    """
    global channel

    # Our usable channel has been passed to us, assign it for future use
    channel = channel_


    # Declare a set of queues on this channel
    for queue_name in reversed(queue_names):
        channel.queue_declare(queue=queue_name, durable=True,
                              exclusive=False, auto_delete=False,
                              callback=on_queue_declared)
        #print "done making hash"

def on_queue_declared(frame):
    """
    Called when a queue is declared
    """
    global channel

    print "Sending 'Hello World!' on ", frame.method.queue

    # Send a message
    channel.basic_publish(exchange='',
                          routing_key=frame.method.queue,
                          body='Hello World!')


# Create our connection parameters and connect to RabbitMQ
connection = pika.SelectConnection(pika.ConnectionParameters('localhost'), \
                                   on_connected)

# Start our IO/Event loop
try:
    connection.ioloop.start()
except KeyboardInterrupt:
    print "interrupt"
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    #connection.ioloop.start()

1 ответ

Решение

Мое решение этого состояло в том, чтобы иметь переменную, отслеживающую, были ли все мои очереди объявлены или нет.

В on_queue_declared() я бы проверял эту переменную и, если все мои очереди были объявлены, я начинал публиковать сообщения. Я считаю, что попытка опубликовать сообщения, прежде чем вернуть все Queue.DeclareOks, была причиной проблем.

Другие вопросы по тегам