Как переопределить бэкэнд для задач с сельдереем

Мы используем Redis в качестве нашего бэкенда. Однако для одной задачи мы хотели бы переопределить это, чтобы использовать вместо него RabbitMQ.

Документация для Task.backend гласит:

Бэкэнд хранилища результатов, используемый для этой задачи. По умолчанию установлено значение CELERY_RESULT_BACKEND.

Итак, я предположил, что мы могли бы установить Task.backend в строку того же формата, принятого CELERY_RESULT_BACKEND,

Итак, я пытаюсь это:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...

Однако работник терпит неудачу с:

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'

2 ответа

Решение

Документация неверна. Task.backend на самом деле является экземпляром внутреннего класса из celery.backends, В этом случае, чтобы переопределить класс задачи, я должен был сделать это:

from celery.backends.amqp import AMQPBackend

@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
    ...

Однако рабочие продолжают использовать класс по умолчанию и, похоже, не предлагают способ переопределить это.

Extend & update the original accepted answer, those who hit similar issue might be interested with List of Options available in Celery task decorator . See Task.backend section :

The result store backend to use for this task. An instance of one of the backend classes in celery.backends. Defaults to app.backend

Also, AMQP result backend has been removed since the version 5.0 , instead you could use celery.backends.rpc.RPCBackend .

For example :

      from celery.backends.rpc import RPCBackend as CeleryRpcBackend
_rpc_backend = CeleryRpcBackend(app=your_celery_app, \
        exchange=RPC_REPLY_EXCHANGE_DEFAULT_NAME, \
        exchange_type=RPC_EXCHANGE_DEFAULT_TYPE )

@your_celery_app.task(backend=_rpc_backend, ... OTHER_ARGS ... )
def your_task_function(**kwargs):
    DO_SOMETHING ...

Другие вопросы по тегам