Опрос сообщений RabbitMQ с использованием клиента Pika
Я хочу создать получателя / потребителя RabbitMQ на Python и не знаю, как проверять сообщения. Я пытаюсь сделать это в своем собственном цикле, не используя обратные вызовы в Пика.
Если я понимаю, в клиенте Java я могу использовать getBasic()
проверить, есть ли доступные сообщения без блокировки. Я не против блокировки при получении сообщений, но я не хочу блокировать, пока не появится сообщение.
Я не нахожу четких примеров и еще не разобрался в соответствующем вызове в pika.
3 ответа
Вы можете периодически проверять размер очереди, используя пример этого ответа. Get Queue Size in Pika (AMQP Python)
Цикл обработки очереди может быть выполнен итеративно с помощью process_data_events()
:
import pika
# A stubborn callback that still wants to be in the code.
def mq_callback(ch, method, properties, body):
print(" Received: %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
queue_state = channel.queue_declare(queue="test")
# Configure a callback.
channel.basic_consume(mq_callback, queue="test")
try:
# My own loop here:
while(True):
# Do other processing
# Process message queue events, returning as soon as possible.
# Issues mq_callback() when applicable.
connection.process_data_events(time_limit=0)
finally:
connection.close()
Если вы хотите сделать это синхронно, то вам нужно будет посмотреть на Пика BlockingConnection
BlockingConnection создает слой поверх методов асинхронного ядра Pika, который будет блокировать до тех пор, пока не будет получен ожидаемый ответ. Из-за асинхронного характера вызовов Basic.Deliver и Basic.Return из RabbitMQ в ваше приложение вам все равно необходимо реализовать асинхронные методы в стиле передачи с продолжением, если вы хотите получать сообщения из RabbitMQ с использованием basic_consume или если вы хотите получать уведомления об ошибке доставки при использовании basic_publish.
Больше информации и пример здесь