Обработка очередей SQS с помощью boto

У меня есть скрипт на Python, использующий библиотеку boto на экземпляре ec2, который является частью группы автоматического масштабирования. Сценарий обрабатывает сообщения из очереди SQS:

import boto
from boto.sqs.message import Message

conn = boto.connect_sqs()
q = conn.create_queue('queue-name')

while (qin.count() > 0):
    m = q.get_messages()
    #do something with the message

Имеет ли смысл использование оператора while? Обновляется ли count() в режиме реального времени как:

  1. другие экземпляры убирают сообщения из очереди (или я собираюсь удвоить)
  2. новые сообщения добавляются в очередь (или я буду по ним скучать?)

Как сделать так, чтобы этот скрипт постоянно прослушивал новые добавления в очереди, даже если очередь пуста?

В этом вопросе Обработка элементов в очереди SQS с помощью php-скрипта было отмечено, что "клиентская библиотека sqs ruby ​​имеет метод" poll ", который непрерывно опрашивает очередь и при получении сообщения в очереди передает его в блок". Есть ли эквивалент в Python?

Также было предложено использовать SNS для уведомления сценариев о состоянии очереди сообщений, но я не вижу, как можно настроить адаптивную систему с SNS, поскольку метрические сигналы тревоги недостаточно детализированы.

3 ответа

Решение

Вы не должны полагаться на счет для очереди, поскольку он предназначен только для предоставления приблизительного количества и не гарантирует его точность.

Если вы хотите продолжать опросить вечно, просто сделайте это:

while 1:
    messages = q.get_messages()
    # do something with messages
    time.sleep(N)

Я добавил вызов time.sleep, чтобы ввести задержку в цикл. Значение N должно быть не менее одной секунды и может быть значительно больше, в зависимости от того, насколько быстро вы ожидаете появления новых сообщений в вашей очереди. Если вы не поместите какую-то задержку в цикл, вы, вероятно, начнете ограничиваться службой.

Чтобы избежать многократного чтения сообщения, вы должны попытаться настроить время ожидания видимости в очереди на значение, превышающее время обработки сообщения, а затем убедитесь, что вы удалили сообщение после завершения обработки.

Пример:

# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
    logger.info("... waiting messages ...")
    messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
    for message in messages:
        logger.info('message: %s' % (message,))
        queue_in.delete_message(message)
  1. Когда вы извлекаете сообщение из SQS, оно становится невидимым и недоступным для других запросов очереди (правка - невидимость может быть установлена ​​между 0 и 12 часами).
  2. Вы должны будете получать очередь снова каждый раз, когда добавляются новые сообщения, но это не должно быть проблемой - вот почему служба очередей существует в первую очередь.

Если вы хотите постоянно опрашивать очередь, попробуйте то, что называется " Длинный опрос" - у вас может быть непрерывный опрос в течение до 20 секунд, который возвращается при заполнении очереди.

Надеюсь, что это полезно, иначе покопайтесь в документации по boto sqs.

Другие вопросы по тегам