Узнайте, существует ли задача сельдерея
Можно ли узнать, существует ли задача с определенным идентификатором задачи? Когда я пытаюсь получить статус, я всегда получаю ожидание.
>>> AsyncResult('...').status
'PENDING'
Я хочу знать, является ли данный идентификатор задачи реальным идентификатором задачи сельдерея, а не случайной строкой. Я хочу разные результаты в зависимости от того, существует ли правильная задача для определенного идентификатора.
Возможно, в прошлом была допустимая задача с тем же идентификатором, но результаты могли быть удалены из серверной части.
9 ответов
Celery не записывает состояние при отправке задачи, это частично является оптимизацией (см. http://docs.celeryproject.org/en/latest/userguide/tasks.html).
Если вам это действительно нужно, просто добавьте:
from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish
@after_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
backend.store_result(body['id'], None, "SENT")
Затем вы можете проверить состояние PENDING, чтобы обнаружить, что задача не была отправлена:
>>> result.state != "PENDING"
AsyncResult.state возвращает PENDING в случае неизвестных идентификаторов задач.
ОЖИДАНИЕ
Задача ожидает выполнения или неизвестна. Предполагается, что любой неизвестный идентификатор задачи находится в состоянии ожидания.
http://docs.celeryproject.org/en/latest/userguide/tasks.html
Вы можете предоставить собственные идентификаторы задач, если вам нужно отличить неизвестные идентификаторы от существующих:
>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
...
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
...
Прямо сейчас я использую следующую схему:
- Получить идентификатор задачи.
- Установите для ключа memcache, например, "task_%s"% task.id сообщение "Запущено".
- Передать идентификатор задачи клиенту.
- Теперь из клиента я могу контролировать состояние задачи (установить из сообщений задачи в memcache).
- От задачи к готовности - установите для ключевого сообщения memcache "Готово".
- С клиента на задачу готова - запустите специальную задачу, которая удалит ключ из memcache и сделает необходимые действия по очистке.
Я нашел способ проверить, и у меня он работает:
def check_task_exists(task_id):
inspector = app.control.inspect()
active_tasks = inspector.active()
# Check active tasks
if active_tasks:
for worker, tasks in active_tasks.items():
for task in tasks:
if task['id'] == task_id:
return True
# Check scheduled tasks
scheduled_tasks = inspector.scheduled()
if scheduled_tasks:
for worker, tasks in scheduled_tasks.items():
if task_id in tasks:
return True
# Check reserved tasks
reserved_tasks = inspector.reserved()
if reserved_tasks:
for worker, tasks in reserved_tasks.items():
if task_id in tasks:
return True
# Task not found
return False
Итак, у меня есть эта идея:
import project.celery_tasks as tasks
def task_exist(task_id):
found = False
# tasks is my imported task module from celery
# it is located under /project/project, where the settings.py file is located
i = tasks.app.control.inspect()
s = i.scheduled()
for e in s:
if task_id in s[e]:
found = True
break
a = i.active()
if not found:
for e in a:
if task_id in a[e]:
found = True
break
r = i.reserved()
if not found:
for e in r:
if task_id in r[e]:
found = True
break
# if checking the status returns pending, yet we found it in any queues... it means it exists...
# if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
return found
Согласно https://docs.celeryproject.org/en/stable/userguide/monitoring.html , различные типы проверок очередей: активные, запланированные, зарезервированные, отозванные, зарегистрированные, статистика, query_task,
так что выбирай как хочешь.
И может быть лучший способ проверить очереди на их задачи, но это должно работать для меня, на данный момент.
Вам нужно позвонить .get()
в объекте AsyncTask, который вы создаете для фактического получения результата из серверной части.
Для дальнейшего уточнения моего ответа.
Любая строка технически является допустимым идентификатором, нет способа проверить идентификатор задачи. Единственный способ выяснить, существует ли задача, - это спросить у бэкэнда, знает ли она об этом, и сделать это, вы должны использовать .get()
,
Это создает проблему .get()
блокирует, когда серверная часть не имеет никакой информации о заданном вами идентификаторе задачи, это сделано для того, чтобы вы могли запустить задачу и затем дождаться ее завершения.
В случае исходного вопроса я собираюсь предположить, что ОП хочет получить состояние ранее выполненной задачи. Для этого вы можете передать очень маленький тайм-аут и перехватить ошибки тайм-аута:
from celery.exceptions import TimeoutError
try:
# fetch the result from the backend
# your backend must be fast enough to return
# results within 100ms (0.1 seconds)
result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
result = None
if result:
print "Result exists; state=%s" % (result.state,)
else:
print "Result does not exist"
Само собой разумеется, что это работает, только если ваш бэкэнд хранит результаты, если нет, то нет способа узнать, действителен ли идентификатор задачи или нет, потому что ничего не хранит их запись.
Еще больше уточнений.
То, что вы хотите сделать, не может быть выполнено с помощью бэкэнда AMQP, потому что он не хранит результаты, он перенаправляет их.
Мое предложение было бы переключиться на базу данных базы данных, чтобы результаты находились в базе данных, которую вы можете запросить за пределами существующих модулей сельдерея. Если в базе данных результатов нет задач, можно предположить, что идентификатор недействителен.
возможно, использование Redis Direct является хорошим решением.
pool = redis.ConnectionPool(host=config.REDIS_HOST,
port=config.REDIS_PORT,
db=config.REDIS_DB,
password=config.REDIS_PASSWORD)
redis_client = Redis(connection_pool=pool)
def check_task_exist(id):
for one in redis_client.lrange('celery', 0, -1):
task_info = json.loads(one.decode())
if task_info['headers']['id'] == id:
return True
return False
Пожалуйста, поправьте меня, если я ошибаюсь.
if built_in_status_check(task_id) == 'pending'
if registry_exists(task_id) == true
print 'Pending'
else
print 'Task does not exist'
Пытаться
AsyncResult('blubb').state
это может сработать.
Это должно вернуть что-то другое.