Как динамически добавлять / удалять периодические задачи в Celery (celerybeat)
Если у меня есть функция, определенная следующим образом:
def add(x,y):
return x+y
Есть ли способ динамически добавить эту функцию в качестве периодической задачи сельдерея и запустить его во время выполнения? Я хотел бы иметь возможность сделать что-то вроде (псевдокод):
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
Я также хотел бы динамически остановить или удалить эту задачу с помощью чего-то вроде (псевдокод):
celery.beat.remove_task(some_unique_task_id)
или же
celery.beat.stop(some_unique_task_id)
К вашему сведению, я не использую djcelery, который позволяет вам управлять периодическими задачами через администратора django.
9 ответов
Нет, извините, это невозможно с обычным сельдереем.
Но это легко расширяется, чтобы делать то, что вы хотите, например, планировщик django-celery - это просто чтение подкласса и запись расписания в базу данных (с некоторыми оптимизациями сверху).
Также вы можете использовать планировщик django-celery даже для не-Django проектов.
Что-то вроде этого:
Установите django + django-celery:
$ pip install -U Джанго Джанго-сельдерей
Добавьте следующие параметры в ваш файл celeryconfig:
DATABASES = { 'default': { 'NAME': 'celerybeat.db', 'ENGINE': 'django.db.backends.sqlite3', }, } INSTALLED_APPS = ('djcelery', )
Создайте таблицы базы данных:
$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
Запустите celerybeat с помощью планировщика базы данных:
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ -S djcelery.schedulers.DatabaseScheduler
Также есть djcelerymon
Команда, которую можно использовать для проектов, не относящихся к Django, для запуска celerycam и веб-сервера Django Admin в одном и том же процессе, вы также можете использовать ее для редактирования ваших периодических задач в хорошем веб-интерфейсе:
$ djcelerymon
(Обратите внимание, что по какой-то причине djcelerymon не может быть остановлен с помощью Ctrl+C, вы должны использовать Ctrl+Z + kill %1)
На этот вопрос ответили в группах Google.
Я НЕ АВТОР, все заслуги перед Джин Марком
Вот правильное решение для этого. Подтвердил работу. В моем сценарии я создал подкласс Периодической задачи и создал из него модель, поскольку могу добавлять другие поля в модель по мере необходимости, а также добавлять метод "прекращения". Вы должны установить для свойства периодической задачи значение False и сохранить его перед удалением. Полное подклассирование не является обязательным, метод schedule_every - это тот, который действительно выполняет свою работу. Когда вы будете готовы завершить свою задачу (если вы не создали ее подкласса), вы можете просто использовать PeriodicTask.objects.filter (name =...) для поиска своей задачи, отключить ее, а затем удалить.
Надеюсь это поможет!
from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete()
Это наконец стало возможным благодаря исправлению, включенному в celery v4.1.0. Теперь вам просто нужно изменить записи расписания в базе данных базы данных, и celery-beat будет действовать в соответствии с новым расписанием.
Документы смутно описывают, как это работает. Планировщик по умолчанию для сельдерея, PersistentScheduler
, использует файл полки в качестве своей базы данных расписания. Любые изменения в beat_schedule
словарь в PersistentScheduler
Экземпляры синхронизируются с этой базой данных (по умолчанию каждые 3 минуты) и наоборот. Документы описывают, как добавлять новые записи в beat_schedule
с помощью app.add_periodic_task
, Чтобы изменить существующую запись, просто добавьте новую запись с тем же name
, Удалите запись как из словаря: del app.conf.beat_schedule['name']
,
Предположим, вы хотите отслеживать и изменять свой график ударов сельдерея с помощью внешнего приложения. Тогда у вас есть несколько вариантов:
- Вы можете
open
файл базы данных полки и читать его содержимое как словарь. Напишите обратно в этот файл для внесения изменений. - Вы можете запустить другой экземпляр приложения Celery и использовать его для изменения файла полки, как описано выше.
- Вы можете использовать пользовательский класс планировщика из django-celery-beat, чтобы сохранить расписание в базе данных, управляемой django, и получить доступ к записям там.
- Вы можете использовать планировщик из celerybeat-mongo, чтобы сохранить расписание в бэкенде MongoDB и получить доступ к записям в нем.
Существует библиотека под названием django-celery-beat, в которой представлены необходимые модели. Чтобы динамически загружать новые периодические задачи, нужно создать собственный планировщик.
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
print('resetting heap')
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = new_schedule.keys() - self.schedule.keys()
to_remove = self.schedule.keys() - new_schedule.keys()
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
Я искал такое же решение для Celery + Redis, которое можно было бы гибко добавлять / удалять. Посмотрите на этого, redbeat, того же парня из Heroku, даже они поставили Redis + Sentinel.
Надеюсь помогает:)
Ответ от @asksol - это то, что нужно, если в приложении Django.
Для приложений, отличных от django, вы можете использовать
celery-sqlalchemy-scheduler
который смоделирован как django-celery-beat для Django, поскольку он также использует базу данных вместо файла
celerybeat-schedule
.
- https://pypi.org/project/celery-sqlalchemy-scheduler/
- https://github.com/AngelLiang/celery-sqlalchemy-scheduler
Вот пример добавления новой задачи во время выполнения.
tasks.py
from celery import Celery
celery = Celery('tasks')
beat_dburi = 'sqlite:///schedule.db'
celery.conf.update(
{'beat_dburi': beat_dburi}
)
@celery.task
def my_task(arg1, arg2, be_careful):
print(f"{arg1} {arg2} be_careful {be_careful}")
Бревна (Продюсер)
$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...
Журналы (Потребитель)
$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
Расписания базы данных
$ sqlite3 schedule.db
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule celery_periodic_task_changed
celery_interval_schedule celery_solar_schedule
celery_periodic_task
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
Теперь, когда эти воркеры уже работают, давайте обновим расписания, добавив новую запланированную задачу. Обратите внимание, что это во время выполнения, без необходимости перезапускать воркеры.
$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>>
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
... schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
... session.add(schedule)
... session.commit()
...
>>>
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
... interval=schedule, # we created this above.
... name='My task', # simply describes this periodic task.
... task='tasks.my_task', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... )
>>> session.add(periodic_task)
>>> session.commit()
Расписания базы данных (обновлено)
- Теперь мы видим, что вновь добавленное расписание отразилось в базе данных, которая постоянно считывается планировщиком ударов сельдерея. Таким образом, если будут какие-либо обновления со значениями args или kwargs, мы можем легко выполнить обновления SQL в базе данных, и это должно отражаться в реальном времени с запущенными рабочими (без необходимости перезапуска).
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|
Бревна (Продюсер)
- Теперь новая задача ставится в очередь каждые 10 секунд.
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
Журналы (Потребитель)
- Вновь добавленная задача корректно выполняется вовремя каждые 10 секунд.
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None
Вы можете проверить этот flask-djcelery, который конфигурирует flask и djcelery, а также предоставляет доступный API для просмотра.
Некоторое время назад мне понадобилось динамически обновлять периодические задачи в Celery и Django , и я написал статью о своем подходе (код статьи ).
Я использовал пакет django-celery-beat . Он предоставляет модели баз данных для иIntervalSchedule
. МанипулируяPeriodicTask
объекты, вы можете добавлять/удалять/обновлять/приостанавливать периодические задачи в Celery.
Создать периодическую задачу
from django_celery_beat.models import IntervalSchedule, PeriodicTask
schedule, created = IntervalSchedule.objects.get_or_create(
every=instance.interval,
period=IntervalSchedule.SECONDS,
)
task = PeriodicTask.objects.create(
interval=schedule,
name=f"Monitor: {instance.endpoint}",
task="monitors.tasks.task_monitor",
kwargs=json.dumps(
{
"monitor_id": instance.id,
}
),
)
Удалить периодическую задачу
PeriodicTask.objects.get(pk=task_id).delete()
Интервал изменения в периодической задаче
task = PeriodicTask.objects.get(pk=your_id)
schedule, created = IntervalSchedule.objects.get_or_create(
every=new_interval,
period=IntervalSchedule.SECONDS,
)
task.interval = schedule
task.save()
Приостановить периодическую задачу
task = PeriodicTask.objects.get(pk=your_id)
task.enabled = false
task.save()
Бит сервис
Когда используешьdjango-celery-beat
вам нужно передать аргумент планировщика при запуске сервиса beat:
celery -A backend beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval 10
Сельдерей может реализовать динамическую периодическую задачу с базами данных и сам вызов.
Но лучше APSchedule.
Потому что динамическая периодическая задача всегда означает длинный обратный отсчет или ету. Слишком большое количество этих периодических задач может занять много памяти, из-за чего потребуется много времени для перезапуска и выполнения задач без задержки.
tasks.py
import sqlite3
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
imports=['tasks'],
)
conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks`
(
`id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
`name` TEXT,
`countdown` INTEGER
);
'''
c.execute(sql)
def create(name='job', countdown=5):
sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
c.execute(sql, (name, countdown))
conn.commit()
return c.lastrowid
def read(id=None, verbose=False):
sql = 'SELECT * FROM `tasks` '
if id:
sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
all_rows = c.execute(sql).fetchall()
if verbose:
print(all_rows)
return all_rows
def update(id, countdown):
sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
c.execute(sql, (countdown, id))
conn.commit()
def delete(id, verbose=False):
sql = 'DELETE FROM `tasks` WHERE `id`=?'
affected_rows = c.execute(sql, (id,)).rowcount
if verbose:
print('deleted {} rows'.format(affected_rows))
conn.commit()
@app.task
def job(id):
id = read(id)
if id:
id, name, countdown = id[0]
else:
logger.info('stop')
return
logger.warning('id={}'.format(id))
logger.warning('name={}'.format(name))
logger.warning('countdown={}'.format(countdown))
job.apply_async(args=(id,), countdown=countdown)
main.py
from tasks import *
id = create(name='job', countdown=5)
job(id)
# job.apply_async((id,), countdown=5) # wait 5s
print(read())
input('enter to update')
update(id, countdown=1)
input('enter to delete')
delete(id, verbose=True)