В сельдерее, как получить статус задачи для всех задач для конкретного имени задачи?
В сельдерее я хочу получить статус задачи для всех задач для конкретного имени задачи. Для этого попробовал приведенный ниже код.
import celery.events.state
# Celery status instance.
stat = celery.events.state.State()
# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")
# Print tasks.
print query
Теперь я получаю пустой список в этом коде.
2 ответа
celery.events.state.State()
это структура данных, используемая для отслеживания состояния работников и задач сельдерея При звонке State()
, вы получаете пустой объект состояния без данных.
Вы должны использовать app.events.Receiver
(Потоковая обработка) или celery.events.snapshot
(Пакетная обработка) для захвата состояния, содержащего задачи.
Образец кода:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
В сельдерее вы можете легко найти статус задачи, обращаясь к ней через идентификатор задачи, если вы хотите получить к ним доступ из другой функции.
Образец кода:-
@task(name='Sum_of_digits')
def ABC(x,y):
return x+y
Добавить эту задачу для обработки
res = ABC.delay(1, 2)
Теперь используйте задачу res для получения состояния, статуса и результатов (res.get())
print(f"id={res.id}, state={res.state}, status={res.status}")
Это не поддерживается изначально. В зависимости от серверной части (Mongo, Redis и т. Д.), Вы можете или не сможете проанализировать содержимое очереди и выяснить, что в ней находится. Даже если вы это сделаете, вы пропустите предметы в настоящее время в процессе.
Тем не менее, вы можете справиться с этим самостоятельно:
result = mytask.delay(...)
my_datastore.save("mytask", result.id)
...
for id in my_datastore.find(task="mytask"):
res = AsyncResult(id)
print res.state