Django Celery Workflow Chain Пауза / Резюме

Есть ли способ приостановить / возобновить работающий рабочий процесс, созданный с помощью цепочек из сельдерея 3.0?

По сути, в нашей системе есть два разных типа задач: интерактивные и неинтерактивные. Неинтерактивные параметры, для которых у нас есть все параметры, но интерактивные требуют ввода данных пользователем. Обратите внимание, что для интерактивных задач мы можем запросить пользовательский ввод только после того, как все предыдущие такты в цепочке будут выполнены, поскольку их результаты будут влиять на интерактивные задачи (т.е. мы не можем запрашивать пользовательский ввод перед созданием фактической цепочки).

Любое предложение о том, как подойти к этому? Действительно в недоумении здесь..

Актуальные идеи:

  • Создайте два подкласса задачи (из задачи импорта сельдерея). Добавьте дополнительную переменную экземпляра (члена класса) в подкласс интерактивной задачи, для которой по умолчанию установлено значение false, и это означает, что некоторый пользовательский ввод все еще необходим. Каким-то образом есть доступ к экземпляру Задачи, и установить для него значение true извне работника сельдерея (хотя я довольно долго это искал, и не представляется возможным получить доступ к объектам Задачи непосредственно из другого модуля).
  • Разделите цепочку на несколько цепочек, разграниченных интерактивными заданиями. Пусть какой-то механизм за пределами сельдерея обнаружит, как только цепочка достигнет своего конца, и запустит интерактивный клиентский компонент интерактивной задачи. После того, как пользователь ввел все эти данные, получите данные и запустите новую цепочку, где интерактивная задача находится во главе новой цепочки.

1 ответ

Решение

Мы реализовали что-то вроде вашей второй идеи в нашем проекте, и она отлично работает. Вот суть реализации.

Добавить новое поле status к вашей модели и переопределить метод сохранения.

models.py:

class My_Model(models.Model):
    # some fields
    status = models.IntegerField(default=0)

    def save(self, *args, **kwargs):
        super(My_Model, self).save(*args, **kwargs)
        from .functions import custom_func
        custom_func(self.status)

tasks.py

@celery.task()
def non_interactive_task():
    #do something.

@celery.task()
def interactive_task():
    #do something.

functions.py

def custom_func(status):
    #Change status after non interactive task is completed.
    #Based on status, start interactive task.

Проходить status переменная в шаблон, которая полезна для отображения элемента пользовательского интерфейса для ввода информации пользователем. Когда пользователь введет необходимую информацию, измените статус. Это звонки custom_func который вызывает ваш interactive_task,

Другие вопросы по тегам