Django Celery Workflow Chain Пауза / Резюме
Есть ли способ приостановить / возобновить работающий рабочий процесс, созданный с помощью цепочек из сельдерея 3.0?
По сути, в нашей системе есть два разных типа задач: интерактивные и неинтерактивные. Неинтерактивные параметры, для которых у нас есть все параметры, но интерактивные требуют ввода данных пользователем. Обратите внимание, что для интерактивных задач мы можем запросить пользовательский ввод только после того, как все предыдущие такты в цепочке будут выполнены, поскольку их результаты будут влиять на интерактивные задачи (т.е. мы не можем запрашивать пользовательский ввод перед созданием фактической цепочки).
Любое предложение о том, как подойти к этому? Действительно в недоумении здесь..
Актуальные идеи:
- Создайте два подкласса задачи (из задачи импорта сельдерея). Добавьте дополнительную переменную экземпляра (члена класса) в подкласс интерактивной задачи, для которой по умолчанию установлено значение false, и это означает, что некоторый пользовательский ввод все еще необходим. Каким-то образом есть доступ к экземпляру Задачи, и установить для него значение true извне работника сельдерея (хотя я довольно долго это искал, и не представляется возможным получить доступ к объектам Задачи непосредственно из другого модуля).
- Разделите цепочку на несколько цепочек, разграниченных интерактивными заданиями. Пусть какой-то механизм за пределами сельдерея обнаружит, как только цепочка достигнет своего конца, и запустит интерактивный клиентский компонент интерактивной задачи. После того, как пользователь ввел все эти данные, получите данные и запустите новую цепочку, где интерактивная задача находится во главе новой цепочки.
1 ответ
Мы реализовали что-то вроде вашей второй идеи в нашем проекте, и она отлично работает. Вот суть реализации.
Добавить новое поле status
к вашей модели и переопределить метод сохранения.
models.py:
class My_Model(models.Model):
# some fields
status = models.IntegerField(default=0)
def save(self, *args, **kwargs):
super(My_Model, self).save(*args, **kwargs)
from .functions import custom_func
custom_func(self.status)
tasks.py
@celery.task()
def non_interactive_task():
#do something.
@celery.task()
def interactive_task():
#do something.
functions.py
def custom_func(status):
#Change status after non interactive task is completed.
#Based on status, start interactive task.
Проходить status
переменная в шаблон, которая полезна для отображения элемента пользовательского интерфейса для ввода информации пользователем. Когда пользователь введет необходимую информацию, измените статус. Это звонки custom_func
который вызывает ваш interactive_task
,