Настройка периодических задач в Celery (celerybeat) динамически с помощью add_periodic_task

Я использую Celery 4.0.1 с Django 1.10 и у меня проблемы с планированием задач (запуск задачи работает нормально). Вот конфигурация сельдерея:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)

Тогда в tasks.py у меня есть:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)

В views.py я хочу запланировать эту задачу:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))

Затем я выполняю команды:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app

Но задача никогда не запланирована. Я ничего не вижу в журналах. Задача работает, потому что если, на мой взгляд, я делаю:

def my_view(request, id):
    my_task.delay(id)

Задача выполнена.

If in my configuration file if I schedule the task manually, like this it works:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

I just can't schedule the task dynamically. Любая идея?

1 ответ

Решение

На самом деле вы не можете не определять периодическую задачу на уровне просмотра, потому что параметр расписания ударов будет загружен первым и не может быть перенесен во время выполнения:

add_periodic_task() Функция добавит запись в настройку beat_schedule за кулисами, и эту же настройку также можно использовать для настройки периодических задач вручную:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

что означает, что если вы хотите использовать add_periodic_task() это должно быть завернуто в on_after_configure обработчик на уровне приложения сельдерея и любые изменения во время выполнения не вступят в силу:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, my_task.s(66))

Как уже упоминалось в документе, регулярный сельдерея просто отслеживает выполнение задачи:

Планировщик по умолчанию celery.beat.PersistentScheduler, который просто отслеживает последние времена выполнения в локальном файле базы данных полки.

Чтобы иметь возможность динамически управлять периодическими задачами и перепланировать ритм сельдерея во время выполнения:

Также имеется расширение django-celery-beat, которое хранит расписание в базе данных Django и предоставляет удобный интерфейс администратора для управления периодическими задачами во время выполнения.

Задачи будут сохранены в базе данных django, и планировщик может быть обновлен в модели задач на уровне БД. Всякий раз, когда вы обновляете периодическое задание, счетчик в этой таблице заданий будет увеличиваться и сообщать службе ритма сельдерея перезагрузить расписание из базы данных.

Возможное решение для вас может быть следующим:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))

views.py

def update_task_view(request, id)
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
    task.args=json.dumps([id])
    task.save()


РЕДАКТИРОВАТЬ: (13/01/2018)


В последнем выпуске 4.1.0 эта тема была рассмотрена в этом билете № 3958 и была объединена

Другие вопросы по тегам