Правильный конфиг, используя rabbitmq в качестве сельдерея
Я создаю приложение фляги с сельдереем, используя rabbitmq в качестве основы сельдерея.
мой конф для сельдерея
CELERY_BROKER_URL='amqp://localhost:5672',
CELERY_RESULT_BACKEND='amqp://',
CELERY_QUEUE_HA_POLICY='all',
CELERY_TASK_RESULT_EXPIRES=None
Затем объявление очереди произвело целую кучу ошибок
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id)
ошибка
PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'durable' for queue '1419349900' in vhost '/':
received 'true' but current is 'false'
ОК, я изменил это на channel.queue_declare(queue=new_task_id, durable=True)
опять ошибка
PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'auto_delete' for queue '1419350288' in vhost '/':
received 'true' but current is 'false'
ОК, я изменил это на channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)
На этот раз ошибка исчезла.
Но как я узнаю об этом до того, как получу эти ошибки? Я искал документ Celery по этой теме, подробную документацию, но не получил то, что мне нужно - он просто перечисляет все пункты conf, но не говорит мне, как его установить. Или это документ от rabbitmq, на который я должен ссылаться?
Спасибо!
редактировать
So, all Queues declared in your configuration file, or in any registered tasks.
Не могли бы вы объяснить немного больше об этом? И какая разница междуdeclare
а такжеcreate
?Вы сказали
Result queues will be created with 'durable', 'auto-delete' flags
Где я могу найти эту информацию? И как сельдерей знает, что очередьresult queue
?
1 ответ
По умолчанию Celery создает все отсутствующие очереди (см. Документацию CELERY_CREATE_MISSING_QUEUES. По умолчанию очереди задач будут создаваться с флагом 'durable'. Очереди результатов будут создаваться с флагами 'durable', 'auto-delete' и 'x-expires' если твой CELERY_TASK_RESULT_EXPIRES
Параметр не None (по умолчанию он установлен на 1 день).
Итак, все очереди объявлены в вашем конфигурационном файле или в любых зарегистрированных задачах. Более того, когда вы используете бэкэнд результата amqp, рабочий, если у вас нет CELERY_IGNORE_RESULT
При заданном параметре очередь результатов будет создана при инициализации тэша и будет названа task_id.
Поэтому, если вы попытаетесь переопределить эту очередь с конфликтующей конфигурацией, RabbitMQ откажется от нее. И поэтому вам не нужно создавать его.
редактировать
"Объявление" очереди, как указано в документации pika, позволяет проверить существование очереди в RabbitMQ и, если нет, создать ее. Если
CELERY_CREATE_MISSING_QUEUES
в конфигурации Celery установлено значение True, при инициализации любая очередь, указанная вCELERY_QUEUES
или жеCELERY_DEFAULT_QUEUE
параметр или любая пользовательская очередь, объявленная в параметрах зарегистрированных задач, например@task(name="custom", queue="my_custom_queue")
или даже в обычаеCELERY_ROUTING
определение, будет "объявлено" RabbitMQ, поэтому будет создано, если они не существуют.Документацию по параметризации очереди можно найти здесь, в параграфе "Использование временных очередей", но лучший способ увидеть это - использовать плагин управления RabbitMQ, позволяющий вам отслеживать в веб-интерфейсе объявленные очереди и их конфигурацию (вы можете увидеть D флаг для Durable и флаг AD для автоматического удаления). Наконец, сельдерей не "знает", является ли очередь очередью результатов, но при создании задача назначается уникальному идентификатору. Этот идентификатор будет использоваться в качестве имени очереди для любого результата. Это означает, что если производитель задачи ожидает результата, он будет прослушивать эту очередь всякий раз, когда она будет создана. И потребитель, после того как задача подтверждена, и до того, как задача действительно будет выполнена, и если задача не игнорирует результаты (через настройку
CELERY_IGNORE_RESULT
или пользовательские параметры задачи), проверит, существует ли очередь, названная в качестве идентификатора задачи, если нет, то создаст ее с конфигурацией результата по умолчанию (см. Конфигурацию бэкенда результата)