Получить результат из 'task_id' в Celery из неизвестного задания

Как вытащить результат задачи, если я не знаю ранее, какая задача была выполнена? Вот настройка: Учитывая следующий источник ('tasks.py'):

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y

с RabbitMQ 3.3.2, работающим локально:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

с Celery 3.1.12, работающим локально:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x105dea3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

Затем я могу импортировать метод и получить результат с помощью 'task_id':

from tasks import add, mul
from celery.result import AsyncResult

result = add.delay(2,2)
task_id = result.task_id
result.get() # 4

result = AsyncResult(id=task_id)
result.get() # 4

result = add.AsyncResult(id=task_id)
result.get() # 4

# and the same for the 'mul' task. Just imagine I put it here

В следующем примере я разделил эти шаги между процессами. В одном процессе я получаю 'task_id' следующим образом:

from tasks import add

result = add.delay(5,5)
task_id = result.task_id

И в другом процессе, если я использую тот же 'task_id' (скопированный и вставленный в другой REPL или в другой HTTP-запрос), например так:

from celery.result import AsyncResult

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db")
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta'
result.state # AttributeError: 'str' object has no attribute 'get_task_meta'
result.status # AttributeError: 'str' object has no attribute 'get_task_meta'

И в другом процессе, если я делаю:

from task import add # in this instance I know that an add task was performed

result = add.AsyncResult(id="copied_task_id")
result.status # "SUCCESSFUL"
result.state # "SUCCESSFUL"
result.get() # 10

Я хотел бы иметь возможность получить результат, не зная заранее, какая задача генерирует результат. В моей реальной среде я планирую вернуть этот task_id клиенту и позволить ему запрашивать статус своей работы через HTTP-запрос.

1 ответ

Итак, я долго искал решение, и теперь, когда я наконец-то официально опубликовал это и просмотрел документацию, я нашел этот драгоценный камень:

класс celery.result.AsyncResult (идентификатор, серверная часть = нет, имя задачи = нет, приложение = нет, родитель = нет)

Запросить состояние задачи.

Параметры:

id - см. id.

backend - см. backend.

исключение TimeoutError

Ошибка возникла для тайм-аутов.

AsyncResult.app = Нет

Таким образом, вместо предоставления параметра backend я предоставил аргумент "app" вместо этого:

from celery.result import AsyncResult
from task import app

# Assuming add.delay(10,10) was called in another process
# and that I'm using a 'task_id' I retrieved from that process

result = AsyncResult(id='copied_task_id', app=app)
result.state # 'SUCCESSFUL'
result.get() # 20

Это, наверное, очевидно для многих. Это было не для меня. Пока все, что я могу сказать, это то, что это решение "просто работает", но я бы чувствовал себя более комфортно, если бы знал, что это санкционированный способ сделать это. Если вам известен раздел в документации, который проясняет это, пожалуйста, опубликуйте его в комментариях или в качестве ответа, и я выберу его в качестве ответа, если смогу.

В случае, если это кому-то поможет, оказывается, что backendпараметр не ожидает строку, а объект Backend: как переопределить backend для задач сельдерея

Для меня сработало:

from celery.backends.rpc import RPCBackend
from myapp.workers.main import app as worker

@worker.task(backend=RPCBackend(app=worker))
def status_check():
    return "OK"
Другие вопросы по тегам