Получить размер очереди в Pika (AMQP Python)

Простой вопрос, но Google или открытый исходный код Pika не помогли. Есть ли способ запросить текущий размер очереди (счетчик элементов) в Пика?

6 ответов

Решение

Есть два способа получить размер очереди в протоколе AMQP. Вы можете использовать Queue.Declare или Basic.Get.

Если вы используете сообщения по мере их поступления с использованием Basic.Consume, вы не сможете получить эту информацию, если не отключите (тайм-аут) и не объявите заново очередь, или не получите одно сообщение, но не подтвердите его. В более новых версиях AMQP вы можете активно запрашивать сообщение.

Что касается Pika, я не знаю специфики, но клиенты Python для AMQP были занудой на моей стороне. Зачастую вам необходимо создать monkeypatch классы, чтобы получить необходимую информацию, или позволить потребителю очереди установить таймаут, чтобы вы могли делать другие вещи через определенные промежутки времени, например, записывать статистику или узнавать, сколько сообщений находится в очереди.

Другой способ обойти это - сдаться и использовать класс Pipe для запуска sudo rabbitmqctl list_queues -p my_vhost, Затем проанализируйте вывод, чтобы найти размер всех очередей. Если вы сделаете это, вам нужно будет настроить /etc/sudoers не спрашивать обычный пароль sudo.

Я молюсь, чтобы кто-то еще с большим опытом Pika ответил на это, указав, как вы можете делать все то, что я упомянул, и в этом случае я скачаю Pika и пну шины. Но если этого не произойдет, и у вас возникнут проблемы с запатентованием кода Pika, посмотрите на haigha, Я обнаружил, что их код гораздо проще, чем другие клиентские библиотеки Python AMQP, потому что они ближе к протоколу AMQP.

Я знаю, что этот вопрос немного устарел, но вот пример того, как сделать это с помощью pika.

Что касается AMQP и RabbitMQ, если вы уже объявили очередь, вы можете повторно объявить очередь с включенным пассивным флагом и сохранить все остальные параметры очереди идентичными. Ответ на эту декларацию Declare-Ok будет включать в себя количество сообщений в очереди.

Вот пример с Пика 0.9.5:

import pika

def on_callback(msg):
    print msg

params = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=pika.credentials.PlainCredentials('guest', 'guest'),
    )

# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False
    )

# ...

# Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )
print 'Messages in queue %d' % res.method.message_count

Это напечатает следующее:

<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0

Вы получаете количество сообщений от message_count член.

Вот как вы можете получить длину очереди, используя pika(учитывая, что вы используете по умолчанию имя пользователя и пароль на локальном хосте), замените q_name на имя вашей очереди.

import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count

Вы пробовали PyRabbit? Оно имеет get_queue_depth() метод, который звучит как то, что вы ищете.

Просто публикую это на случай, если кто-то еще наткнется на это обсуждение. Ответ с наибольшим количеством голосов, т.е.:

      # Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )

был очень полезен для меня, но с серьезной оговоркой. Согласно документации pika, флаг используется только для проверки существования очереди. Таким образом, можно предположить, что вы можете использовать функцию queue_declare с флагом, чтобы проверить, существует ли очередь в ситуациях, когда есть шанс, что очередь никогда не была объявлена. Из моего тестирования, если вы вызываете эту функцию с флагом, а очередь не существует, API не только выдает исключение; это также приведет к тому, что брокер отключит ваш канал, поэтому, даже если вы изящно поймаете исключение, вы потеряете соединение с брокером. Я проверил это с двумя разными скриптами Python на простом ванильном контейнере RabbitMQ, работающем в миникубе. Я запускал этот тест много раз, и каждый раз получаю одно и то же поведение.

Мой тестовый код:

      import logging
import pika

logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)


def on_callback(msg):
    logger.info(f"Callback msg: {msg}")


queue_name = "testy"

credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials)
)

logger.info("Connection established")

channel = connection.channel()

logger.info("Channel created")

channel.exchange_declare(exchange="svc-exchange", exchange_type="direct", durable=True)

response = channel.queue_declare(
    queue=queue_name, durable=True, exclusive=False, auto_delete=False, passive=True
)

logger.info(f"queue_declare response: {response}")

channel.queue_delete(queue=queue_name)

connection.close()

Выход:

      INFO:__main__:Connection established
INFO:__main__:Channel created
WARNING:pika.channel:Received remote Channel.Close (404): "NOT_FOUND - no queue 'testy' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x1047e2700> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
Traceback (most recent call last):
  File "check_queue_len.py", line 29, in <module>
    response = channel.queue_declare(
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2521, in queue_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1354, in _flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'testy' in vhost '/'")

Когда я установилна Ложь:

      scripts % python check_queue_len.py
INFO:__main__:Connection established
INFO:__main__:Channel created
INFO:__main__:queue_declare response: <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=testy'])>"])>

Пожалуйста, дайте мне знать, если я что-то здесь упустил.

Я опаздываю на вечеринку, но это пример получения количества очередей с использованием pyrabbit или pyrabbit2 из AWS AmazonMQ с HTTPS, который также должен работать на RabbitMQ:

      from pyrabbit2.api import Client

cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
if not cl.is_alive():
    raise Exception("Failed to connect to rabbitmq")

for i in cl.get_all_vhosts():
    print(i['name'])

queues = [q['name'] for q in cl.get_queues('/')]
print(queues)    

itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
print(itemCount)
Другие вопросы по тегам