Обработка очередей SQS с помощью boto
У меня есть скрипт на Python, использующий библиотеку boto на экземпляре ec2, который является частью группы автоматического масштабирования. Сценарий обрабатывает сообщения из очереди SQS:
import boto
from boto.sqs.message import Message
conn = boto.connect_sqs()
q = conn.create_queue('queue-name')
while (qin.count() > 0):
m = q.get_messages()
#do something with the message
Имеет ли смысл использование оператора while? Обновляется ли count() в режиме реального времени как:
- другие экземпляры убирают сообщения из очереди (или я собираюсь удвоить)
- новые сообщения добавляются в очередь (или я буду по ним скучать?)
Как сделать так, чтобы этот скрипт постоянно прослушивал новые добавления в очереди, даже если очередь пуста?
В этом вопросе Обработка элементов в очереди SQS с помощью php-скрипта было отмечено, что "клиентская библиотека sqs ruby имеет метод" poll ", который непрерывно опрашивает очередь и при получении сообщения в очереди передает его в блок". Есть ли эквивалент в Python?
Также было предложено использовать SNS для уведомления сценариев о состоянии очереди сообщений, но я не вижу, как можно настроить адаптивную систему с SNS, поскольку метрические сигналы тревоги недостаточно детализированы.
3 ответа
Вы не должны полагаться на счет для очереди, поскольку он предназначен только для предоставления приблизительного количества и не гарантирует его точность.
Если вы хотите продолжать опросить вечно, просто сделайте это:
while 1:
messages = q.get_messages()
# do something with messages
time.sleep(N)
Я добавил вызов time.sleep, чтобы ввести задержку в цикл. Значение N должно быть не менее одной секунды и может быть значительно больше, в зависимости от того, насколько быстро вы ожидаете появления новых сообщений в вашей очереди. Если вы не поместите какую-то задержку в цикл, вы, вероятно, начнете ограничиваться службой.
Чтобы избежать многократного чтения сообщения, вы должны попытаться настроить время ожидания видимости в очереди на значение, превышающее время обработки сообщения, а затем убедитесь, что вы удалили сообщение после завершения обработки.
Пример:
# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
logger.info("... waiting messages ...")
messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
for message in messages:
logger.info('message: %s' % (message,))
queue_in.delete_message(message)
- Когда вы извлекаете сообщение из SQS, оно становится невидимым и недоступным для других запросов очереди (правка - невидимость может быть установлена между 0 и 12 часами).
- Вы должны будете получать очередь снова каждый раз, когда добавляются новые сообщения, но это не должно быть проблемой - вот почему служба очередей существует в первую очередь.
Если вы хотите постоянно опрашивать очередь, попробуйте то, что называется " Длинный опрос" - у вас может быть непрерывный опрос в течение до 20 секунд, который возвращается при заполнении очереди.
Надеюсь, что это полезно, иначе покопайтесь в документации по boto sqs.