Как проверить статус задания в сельдерее?

Как проверить, выполняется ли задание в сельдерее (в частности, я использую celery-django)?

Я прочитал документацию, и я погуглил, но я не вижу такой вызов:

my_example_task.state() == RUNNING

Мой вариант использования заключается в том, что у меня есть внешний (Java) сервис для транскодирования. Когда я отправляю документ для перекодирования, я хочу проверить, выполняется ли задача, которая запускает этот сервис, и, если нет, (пере) запустить его.

Я использую текущие стабильные версии - 2.4, я считаю.

10 ответов

Решение

Каждый Task объект имеет .request свойство, которое содержит его AsyncRequest объект. Соответственно, следующая строка дает состояние задачи task:

task.AsyncResult(task.request.id).state

Верните task_id (который дается из.delay()) и спросите экземпляр сельдерея о состоянии:

x = method.delay(1,2)
print x.task_id

При запросе получите новый AsyncResult, используя этот task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

Создание AsyncResult Объект из идентификатора задачи - это рекомендуемый в FAQ способ получения статуса задачи, когда у вас есть только идентификатор задачи.

Тем не менее, начиная с Celery 3.x, существуют значительные предостережения, которые могут укусить людей, если они не обращают на них внимания. Это действительно зависит от конкретного сценария использования.

По умолчанию Celery не записывает "запущенное" состояние.

Чтобы Celery мог записать, что задание выполняется, вы должны установить task_track_started в True, Вот простая задача, которая проверяет это:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

когда task_track_started является False, который по умолчанию, состояние шоу PENDING даже если задача началась. Если вы установите task_track_started в True тогда государство будет STARTED,

Штат PENDING означает "я не знаю".

AsyncResult с государством PENDING не означает ничего больше, чем то, что Сельдерей не знает статуса задачи. Это может быть из-за любого количества причин.

Для одной вещи, AsyncResult может быть создан с неверными идентификаторами задач. Такие "задачи" будут считаться ожидающими рассмотрения сельдереем:

>>> task.AsyncResult("invalid").status
'PENDING'

Итак, никто не собирается кормить явно недействительными идентификаторами AsyncResult, Справедливо, но это также имеет эффект, что AsyncResult также рассмотрит задачу, которая успешно выполнена, но что Celery забыл как PENDING, Опять же, в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как настроен Celery для сохранения результатов задач, потому что это зависит от наличия "надгробий" в бэкенде результатов. ("Надгробия" - это термин, используемый в документации Celery для фрагментов данных, которые записывают, как завершилась задача.) Использование AsyncResult не будет работать вообще, если task_ignore_result является True, Еще более неприятная проблема заключается в том, что Celery по умолчанию выбрасывает надгробия. result_expires по умолчанию установлено значение 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долговременное хранилище, а также через 24 часа, вы создаете AsyncResult при этом статус будет PENDING,

Все "реальные задачи" начинаются в PENDING государство. Итак, получение PENDING задание может означать, что задание было запрошено, но никогда не продвигалось дальше, чем это (по какой-либо причине). Или это может означать, что задание выполнено, но Сельдери забыл свое состояние

Ой! AsyncResult не будет работать для меня. Что еще я могу сделать?

Я предпочитаю отслеживать цели, а не сами задачи. Я храню некоторую информацию о задачах, но она действительно вторична для отслеживания целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависит от достигнутой цели, он проверяет, была ли цель уже достигнута, если да, то он использует эту кэшированную цель, в противном случае он запускает задачу, которая повлияет на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.


Приведенные выше имена переменных и гиперссылки относятся к Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES,

Старый вопрос, но я недавно столкнулся с этой проблемой.

Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

Вы также можете создавать собственные состояния и обновлять их значение при выполнении задачи. Этот пример из документов:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html

Просто используйте этот API из FAQ по сельдерею

result = app.AsyncResult(task_id)

Это отлично работает.

Ответ 2020 года:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"

Пытаться:

task.AsyncResult(task.request.id).state

это обеспечит статус Задачи Сельдерея. Если задача Celery уже находится в состоянии FAILURE, она выдаст исключение:

raised unexpected: KeyError('exc_type',)

Я нашел полезную информацию в

Инспектор-гид по рабочим проектам сельдерея

В моем случае я проверяю, работает ли Celery.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Вы можете играть с осмотреть, чтобы получить ваши потребности.

  • Сначала , в приложении для сельдерея :

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • а затем перейдите в файл задачи , импортируйте приложение из вашего модуля приложения celery.

vi задачи /task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())

Для простых задач мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.

и для сложных задач, скажем, задача, которая имеет дело со многими другими модулями. Мы рекомендуем вручную записывать ход выполнения и сообщения в конкретном блоке задач.

Помимо вышесказанного Программный подход Использование цветочной задачи можно легко увидеть.

Мониторинг в реальном времени с помощью Celery Events. Flower - это веб-инструмент для мониторинга и администрирования кластеров сельдерея.

  1. Ход выполнения и история
  2. Возможность показать детали задачи (аргументы, время запуска, время выполнения и многое другое)
  3. Графики и статистика

Официальный документ: инструмент мониторинга цветов - сельдерея

Монтаж:

$ pip install flower

Использование:

http://localhost:5555
Другие вопросы по тегам