Правильный конфиг, используя rabbitmq в качестве сельдерея

Я создаю приложение фляги с сельдереем, используя rabbitmq в качестве основы сельдерея.

мой конф для сельдерея

CELERY_BROKER_URL='amqp://localhost:5672',
CELERY_RESULT_BACKEND='amqp://',
CELERY_QUEUE_HA_POLICY='all',
CELERY_TASK_RESULT_EXPIRES=None

Затем объявление очереди произвело целую кучу ошибок

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id)

ошибка

PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'durable' for queue '1419349900' in vhost '/':
received 'true' but current is 'false'

ОК, я изменил это на channel.queue_declare(queue=new_task_id, durable=True)

опять ошибка

PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'auto_delete' for queue '1419350288' in vhost '/':
received 'true' but current is 'false'

ОК, я изменил это на channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

На этот раз ошибка исчезла.

Но как я узнаю об этом до того, как получу эти ошибки? Я искал документ Celery по этой теме, подробную документацию, но не получил то, что мне нужно - он просто перечисляет все пункты conf, но не говорит мне, как его установить. Или это документ от rabbitmq, на который я должен ссылаться?

Спасибо!

редактировать

  1. So, all Queues declared in your configuration file, or in any registered tasks.Не могли бы вы объяснить немного больше об этом? И какая разница между declare а также create?

  2. Вы сказали Result queues will be created with 'durable', 'auto-delete' flagsГде я могу найти эту информацию? И как сельдерей знает, что очередь result queue?

1 ответ

Решение

По умолчанию Celery создает все отсутствующие очереди (см. Документацию CELERY_CREATE_MISSING_QUEUES. По умолчанию очереди задач будут создаваться с флагом 'durable'. Очереди результатов будут создаваться с флагами 'durable', 'auto-delete' и 'x-expires' если твой CELERY_TASK_RESULT_EXPIRES Параметр не None (по умолчанию он установлен на 1 день).

Итак, все очереди объявлены в вашем конфигурационном файле или в любых зарегистрированных задачах. Более того, когда вы используете бэкэнд результата amqp, рабочий, если у вас нет CELERY_IGNORE_RESULT При заданном параметре очередь результатов будет создана при инициализации тэша и будет названа task_id.

Поэтому, если вы попытаетесь переопределить эту очередь с конфликтующей конфигурацией, RabbitMQ откажется от нее. И поэтому вам не нужно создавать его.

редактировать

  1. "Объявление" очереди, как указано в документации pika, позволяет проверить существование очереди в RabbitMQ и, если нет, создать ее. Если CELERY_CREATE_MISSING_QUEUES в конфигурации Celery установлено значение True, при инициализации любая очередь, указанная в CELERY_QUEUES или же CELERY_DEFAULT_QUEUE параметр или любая пользовательская очередь, объявленная в параметрах зарегистрированных задач, например @task(name="custom", queue="my_custom_queue") или даже в обычае CELERY_ROUTING определение, будет "объявлено" RabbitMQ, поэтому будет создано, если они не существуют.

  2. Документацию по параметризации очереди можно найти здесь, в параграфе "Использование временных очередей", но лучший способ увидеть это - использовать плагин управления RabbitMQ, позволяющий вам отслеживать в веб-интерфейсе объявленные очереди и их конфигурацию (вы можете увидеть D флаг для Durable и флаг AD для автоматического удаления). Наконец, сельдерей не "знает", является ли очередь очередью результатов, но при создании задача назначается уникальному идентификатору. Этот идентификатор будет использоваться в качестве имени очереди для любого результата. Это означает, что если производитель задачи ожидает результата, он будет прослушивать эту очередь всякий раз, когда она будет создана. И потребитель, после того как задача подтверждена, и до того, как задача действительно будет выполнена, и если задача не игнорирует результаты (через настройку CELERY_IGNORE_RESULT или пользовательские параметры задачи), проверит, существует ли очередь, названная в качестве идентификатора задачи, если нет, то создаст ее с конфигурацией результата по умолчанию (см. Конфигурацию бэкенда результата)

Другие вопросы по тегам