Проверьте, если в задаче сельдерея

Как проверить, что функция выполняется в сельдерее?

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...

@celery.task()
def celery_test():
    notification()

def not_celery_test():
    notification()

1 ответ

Решение

Вот один из способов сделать это с помощью celery.current_task, Вот код, который будет использоваться задачей:

def notification():
    from celery import current_task
    if not current_task:
        print "directly called"
    elif current_task.request.id is None:
        print "called synchronously"
    else:
        print "dispatched"

@app.task
def notify():
    notification()

Это код, который вы можете запустить, чтобы выполнить вышеизложенное:

        from core.tasks import notify, notification
        print "DIRECT"
        notification()
        print "NOT DISPATCHED"
        notify()
        print "DISPATCHED"
        notify.delay().get()

Мой код задачи в первом фрагменте был в модуле с именем core.tasks, И я поместил код в последнем фрагменте в пользовательскую команду управления Django. Это тестирует 3 случая:

  • призвание notification непосредственно.

  • призвание notification через задачу, выполненную синхронно. То есть эта задача не отправляется через сельдерея к работнику. Код задачи выполняется в том же процессе, который вызывает notify,

  • призвание notification через задачу, выполняемую работником. Код задачи выполняется в процессе, отличном от процесса, который его запустил.

Выход был:

NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called

Там нет линии от print в задаче на выходе после DISPATCHED потому что эта строка попадает в рабочий журнал:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

Важное примечание: я изначально использовал if current_task is None в первом тесте, но это не сработало. Я проверил и перепроверил. Каким-то образом сельдерей устанавливает current_task к объекту, который выглядит как None (если вы используете repr на это вы получите None) но не None, Не уверен, что там происходит. С помощью if not current_task работает.

Кроме того, я тестировал приведенный выше код в приложении Django, но не использовал его в работе. Там могут быть ошибки, которые я не знаю.

Другие вопросы по тегам