В сельдерее, как получить задание в очереди?

Я использую Celery с Redis в качестве посредника и вижу, что очередь на самом деле представляет собой список redis с сериализованной задачей в качестве элементов.

У меня вопрос, если у меня есть объект AsyncResult в результате вызова <task>.delay(), есть ли способ определить позицию элемента в очереди?

ОБНОВИТЬ:

Я наконец смог получить позицию, используя:

from celery.task.control import inspect
i = inspect()
i.reserved()

но это немного медленно, так как он должен общаться со всеми рабочими.

1 ответ

inspect.reserved()/scheduled() Вы упомянули, что это может работать, но не всегда точно, так как в нем учитываются только те задачи, которые рабочие предварительно выбрали.

Celery не допускает внеполосных операций в очереди, таких как удаление сообщений из очереди или их переупорядочение, поскольку они не будут масштабироваться в распределенной системе. Сообщения, возможно, еще не достигли очереди, что может привести к условиям гонки, и на практике это не последовательная очередь с транзакционными операциями, а поток сообщений, происходящих из нескольких мест. То есть, Celery API основан на строгой семантике передачи сообщений.

Можно получить доступ к очереди напрямую у некоторых брокеров, поддерживаемых Celery (например, Redis или Database), но это не является частью общедоступного API, и вам не рекомендуется это делать, но, конечно, если вы не планируете поддерживать При масштабных операциях вы должны делать все, что вам удобнее, и отказаться от моего совета.

Если это просто для того, чтобы дать пользователю некоторое представление о том, когда его работа будет завершена, то я уверен, что вы могли бы придумать алгоритм, позволяющий предсказать, когда задача будет выполнена, если у вас была только длина очереди и время. при котором каждая задача была вставлена.

Первый просто redis.len("celery")и последний вы можете добавить себя, слушая task_sent сигнал:

from celery.signals import task_sent

@task_sent.connect
def record_insertion_time(id, **kwargs):
   redis.zadd("celery.insertion_times", id)

Используя отсортированный набор здесь: http://redis.io/commands/zadd

Для чистого решения по передаче сообщений вы можете использовать выделенный монитор, который использует поток событий Celery и прогнозирует, когда задачи завершатся. http://docs.celeryproject.org/en/latest/userguide/monitoring.html

(только что заметил, что в отправленной задаче отсутствует поле отметки времени в документации, но отметка времени отправляется вместе с этим событием, поэтому я исправлю это).

События также содержат поле "часы", которое представляет собой логические часы (см. http://en.wikipedia.org/wiki/Lamport_timestamps), это можно использовать для определения порядка событий в распределенной системе без зависимости от системы. время на каждой машине должно быть синхронизировано (чего невозможно достичь).

Другие вопросы по тегам