Как проверить статус задания в сельдерее?
Как проверить, выполняется ли задание в сельдерее (в частности, я использую celery-django)?
Я прочитал документацию, и я погуглил, но я не вижу такой вызов:
my_example_task.state() == RUNNING
Мой вариант использования заключается в том, что у меня есть внешний (Java) сервис для транскодирования. Когда я отправляю документ для перекодирования, я хочу проверить, выполняется ли задача, которая запускает этот сервис, и, если нет, (пере) запустить его.
Я использую текущие стабильные версии - 2.4, я считаю.
10 ответов
Каждый Task
объект имеет .request
свойство, которое содержит его AsyncRequest
объект. Соответственно, следующая строка дает состояние задачи task
:
task.AsyncResult(task.request.id).state
Верните task_id (который дается из.delay()) и спросите экземпляр сельдерея о состоянии:
x = method.delay(1,2)
print x.task_id
При запросе получите новый AsyncResult, используя этот task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Создание AsyncResult
Объект из идентификатора задачи - это рекомендуемый в FAQ способ получения статуса задачи, когда у вас есть только идентификатор задачи.
Тем не менее, начиная с Celery 3.x, существуют значительные предостережения, которые могут укусить людей, если они не обращают на них внимания. Это действительно зависит от конкретного сценария использования.
По умолчанию Celery не записывает "запущенное" состояние.
Чтобы Celery мог записать, что задание выполняется, вы должны установить task_track_started
в True
, Вот простая задача, которая проверяет это:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
когда task_track_started
является False
, который по умолчанию, состояние шоу PENDING
даже если задача началась. Если вы установите task_track_started
в True
тогда государство будет STARTED
,
Штат PENDING
означает "я не знаю".
AsyncResult
с государством PENDING
не означает ничего больше, чем то, что Сельдерей не знает статуса задачи. Это может быть из-за любого количества причин.
Для одной вещи, AsyncResult
может быть создан с неверными идентификаторами задач. Такие "задачи" будут считаться ожидающими рассмотрения сельдереем:
>>> task.AsyncResult("invalid").status
'PENDING'
Итак, никто не собирается кормить явно недействительными идентификаторами AsyncResult
, Справедливо, но это также имеет эффект, что AsyncResult
также рассмотрит задачу, которая успешно выполнена, но что Celery забыл как PENDING
, Опять же, в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как настроен Celery для сохранения результатов задач, потому что это зависит от наличия "надгробий" в бэкенде результатов. ("Надгробия" - это термин, используемый в документации Celery для фрагментов данных, которые записывают, как завершилась задача.) Использование AsyncResult
не будет работать вообще, если task_ignore_result
является True
, Еще более неприятная проблема заключается в том, что Celery по умолчанию выбрасывает надгробия. result_expires
по умолчанию установлено значение 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долговременное хранилище, а также через 24 часа, вы создаете AsyncResult
при этом статус будет PENDING
,
Все "реальные задачи" начинаются в PENDING
государство. Итак, получение PENDING
задание может означать, что задание было запрошено, но никогда не продвигалось дальше, чем это (по какой-либо причине). Или это может означать, что задание выполнено, но Сельдери забыл свое состояние
Ой! AsyncResult
не будет работать для меня. Что еще я могу сделать?
Я предпочитаю отслеживать цели, а не сами задачи. Я храню некоторую информацию о задачах, но она действительно вторична для отслеживания целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависит от достигнутой цели, он проверяет, была ли цель уже достигнута, если да, то он использует эту кэшированную цель, в противном случае он запускает задачу, которая повлияет на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.
Приведенные выше имена переменных и гиперссылки относятся к Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED
, CELERY_IGNORE_RESULT
, CELERY_TASK_RESULT_EXPIRES
,
Старый вопрос, но я недавно столкнулся с этой проблемой.
Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult:
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult's status
print result.status
SUCCESS
# print the result returned
print result.result
4
Вы также можете создавать собственные состояния и обновлять их значение при выполнении задачи. Этот пример из документов:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html
Просто используйте этот API из FAQ по сельдерею
result = app.AsyncResult(task_id)
Это отлично работает.
Ответ 2020 года:
#### tasks.py
@celery.task()
def mytask(arg1):
print(arg1)
#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
state = process.state
return f"Thanks for your patience, your job {process.task_id} \
is being processed. Status {state}"
Пытаться:
task.AsyncResult(task.request.id).state
это обеспечит статус Задачи Сельдерея. Если задача Celery уже находится в состоянии FAILURE, она выдаст исключение:
raised unexpected: KeyError('exc_type',)
Я нашел полезную информацию в
Инспектор-гид по рабочим проектам сельдерея
В моем случае я проверяю, работает ли Celery.
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)
Вы можете играть с осмотреть, чтобы получить ваши потребности.
- Сначала , в приложении для сельдерея :
vi my_celery_apps / app1.py
app = Celery(worker_name)
- а затем перейдите в файл задачи , импортируйте приложение из вашего модуля приложения celery.
vi задачи /task1.py
from my_celery_apps.app1 import app
app.AsyncResult(taskid)
try:
if task.state.lower() != "success":
return
except:
""" do something """
res = method.delay()
print(f"id={res.id}, state={res.state}, status={res.status} ")
print(res.get())
Для простых задач мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.
и для сложных задач, скажем, задача, которая имеет дело со многими другими модулями. Мы рекомендуем вручную записывать ход выполнения и сообщения в конкретном блоке задач.
Помимо вышесказанного Программный подход Использование цветочной задачи можно легко увидеть.
Мониторинг в реальном времени с помощью Celery Events. Flower - это веб-инструмент для мониторинга и администрирования кластеров сельдерея.
- Ход выполнения и история
- Возможность показать детали задачи (аргументы, время запуска, время выполнения и многое другое)
- Графики и статистика
Официальный документ: инструмент мониторинга цветов - сельдерея
Монтаж:
$ pip install flower
Использование:
http://localhost:5555