В сельдерее, как получить статус задачи для всех задач для конкретного имени задачи?

В сельдерее я хочу получить статус задачи для всех задач для конкретного имени задачи. Для этого попробовал приведенный ниже код.

import celery.events.state

# Celery status instance.
stat = celery.events.state.State()

# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")

# Print tasks.
print query

Теперь я получаю пустой список в этом коде.

2 ответа

celery.events.state.State() это структура данных, используемая для отслеживания состояния работников и задач сельдерея При звонке State(), вы получаете пустой объект состояния без данных.

Вы должны использовать app.events.Receiver(Потоковая обработка) или celery.events.snapshot(Пакетная обработка) для захвата состояния, содержащего задачи.

Образец кода:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

В сельдерее вы можете легко найти статус задачи, обращаясь к ней через идентификатор задачи, если вы хотите получить к ним доступ из другой функции.

Образец кода:-

@task(name='Sum_of_digits')
def ABC(x,y):
   return x+y

Добавить эту задачу для обработки

 res = ABC.delay(1, 2)

Теперь используйте задачу res для получения состояния, статуса и результатов (res.get())

 print(f"id={res.id}, state={res.state}, status={res.status}")

Это не поддерживается изначально. В зависимости от серверной части (Mongo, Redis и т. Д.), Вы можете или не сможете проанализировать содержимое очереди и выяснить, что в ней находится. Даже если вы это сделаете, вы пропустите предметы в настоящее время в процессе.

Тем не менее, вы можете справиться с этим самостоятельно:

result = mytask.delay(...)
my_datastore.save("mytask", result.id)
...
for id in my_datastore.find(task="mytask"):
    res = AsyncResult(id)
    print res.state
Другие вопросы по тегам