Django - настроить запланированное задание?
Я работал над веб-приложением с использованием Django, и мне любопытно, есть ли способ запланировать периодическое выполнение задания.
По сути, я просто хочу пробежаться по базе данных и делать некоторые вычисления / обновления на регулярной основе автоматически, но я не могу найти какую-либо документацию по этому вопросу.
Кто-нибудь знает, как это настроить?
Чтобы уточнить: я знаю, я могу настроить cron
работа, чтобы сделать это, но мне любопытно, есть ли какая-то функция в Django, которая обеспечивает эту функцию. Я бы хотел, чтобы люди могли самостоятельно развернуть это приложение, не выполняя много настроек (желательно нулевых).
Я подумал о том, чтобы инициировать эти действия "задним числом", просто проверив, нужно ли было запускать задание с момента последней отправки запроса на сайт, но я надеюсь на что-то более чистое.
27 ответов
Одно решение, которое я использовал, состоит в том, чтобы сделать это:
1) Создайте пользовательскую команду управления, например
python manage.py my_cool_command
2) Использование cron
(в Linux) или at
(в Windows), чтобы запустить мою команду в нужное время.
Это простое решение, которое не требует установки тяжелого стека AMQP. Однако, есть и другие преимущества использования чего-то вроде сельдерея, упомянутые в других ответах. В частности, с Celery было бы неплохо не распространять логику вашего приложения в файлы crontab. Однако решение cron довольно хорошо работает для приложений малого и среднего размера, где вам не нужно много внешних зависимостей.
РЕДАКТИРОВАТЬ:
В более поздней версии Windows at
команда устарела для Windows 8, Server 2012 и выше. Ты можешь использовать schtasks.exe
для того же использования.
Celery - это распределенная очередь задач, построенная на AMQP (RabbitMQ). Он также обрабатывает периодические задачи в стиле крона (см. Периодические задачи). В зависимости от вашего приложения, это может стоить того.
Celery довольно легко настроить с помощью django ( docs), и периодические задачи фактически пропускают пропущенные задачи в случае простоя. У сельдерея также есть встроенные механизмы повтора, в случае неудачи.
Мы с открытым исходным кодом, что я думаю, является структурированным приложением. это решение Брайана тоже намекает. Буду рад любой / все отзывы!
https://github.com/tivix/django-cron
Он поставляется с одной командой управления:
./manage.py runcrons
Это делает работу. Каждый крон моделируется как класс (так что все его ОО), и каждый крон работает с разной частотой, и мы гарантируем, что один и тот же тип крона не работает параллельно (в случае, если самим кронам требуется больше времени, чем их частоте!)
Спасибо!
Если вы используете стандартную ОС POSIX, вы используете cron.
Если вы используете Windows, вы используете в.
Напишите команду управления Django
Выясните, на какой платформе они находятся.
Либо выполните соответствующую команду "AT" для своих пользователей, либо обновите crontab для своих пользователей.
Интересное новое подключаемое приложение Django: хронограф django
Вам нужно только добавить одну запись cron, которая действует как таймер, и у вас есть очень хороший интерфейс администратора Django для запуска скриптов.
Посмотрите на Cron Django Poor Man's, который является приложением Django, которое использует спам-ботов, роботов индексации поисковых систем и т. П. Для запуска запланированных задач примерно через равные промежутки времени.
Некоторое время назад у меня было точно такое же требование, и в итоге я решил его с помощью APScheduler ( Руководство пользователя)
Это упрощает планирование заданий и делает его независимым от выполнения кода на основе запросов. Ниже приведен простой пример, который я использовал в своем коде.
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
job = None
def tick():
print('One tick!')\
def start_job():
global job
job = scheduler.add_job(tick, 'interval', seconds=3600)
try:
scheduler.start()
except:
pass
Надеюсь, это кому-нибудь поможет!
Django APScheduler для заданий планировщика. Advanced Python Scheduler (APScheduler) - это библиотека Python, которая позволяет вам запланировать выполнение кода Python позже, либо только один раз, либо периодически. Вы можете добавлять новые вакансии или удалять старые на лету, как вам будет угодно.
примечание: я автор этой библиотеки
Установите APScheduler
pip install apscheduler
Просмотр функции файла для вызова
имя файла: scheduler_jobs.py
def FirstCronTest():
print("")
print("I am executed..!")
Настройка планировщика
создайте файл execute.py и добавьте приведенные ниже коды
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
Ваши написанные функции Здесь функции планировщика написаны в scheduler_jobs
import scheduler_jobs
scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()
Свяжите файл для выполнения
Теперь добавьте строку ниже в конец файла URL.
import execute
- Вы можете проверить полный код, выполнив [Щелкните здесь] https://github.com/devchandansh/django-apscheduler
Предложение Брайана Нила о запуске команд управления через cron хорошо работает, но если вы ищете что-то более надежное (но не такое сложное, как Celery), я бы заглянул в такую библиотеку, как Kronos:
# app/cron.py
import kronos
@kronos.register('0 * * * *')
def task():
pass
RabbitMQ и Celery имеют больше возможностей и возможностей для обработки задач, чем Cron. Если сбой задачи не является проблемой, и вы думаете, что будете обрабатывать сломанные задачи в следующем вызове, то Cron достаточно.
Celery & AMQP позволит вам справиться с неработающей задачей, и другой работник снова выполнит ее (рабочие из Celery слушают следующую задачу), пока задача max_retries
атрибут достигнут. Вы даже можете вызывать задачи при сбое, например, регистрировать сбой или отправлять электронное письмо администратору после max_retries
Был достигнут.
И вы можете распространять серверы Celery и AMQP, когда вам нужно масштабировать ваше приложение.
Хотя Airflow не является частью Django, это более свежий проект (по состоянию на 2016 год), который полезен для управления задачами.
Airflow - это система автоматизации и планирования рабочих процессов, которую можно использовать для создания и управления конвейерами данных. Веб-интерфейс предоставляет разработчику ряд возможностей для управления и просмотра этих конвейеров.
Воздушный поток написан на Python и построен с использованием Flask.
Airflow был создан Maxime Beauchemin в Airbnb и открыт с весны 2015 года. Он присоединился к программе инкубации Apache Software Foundation зимой 2016 года. Вот страница проекта Git и некоторая дополнительная справочная информация.
Лично я использую cron, но части планирования заданий в django-extensions выглядят интересно.
Поместите следующее в начало вашего файла cron.py:
#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'
# imports and code below
Я просто подумал об этом довольно простом решении:
- Определите функцию представления do_work(req, param) так же, как и в любом другом представлении, с отображением URL-адреса, возвратом HttpResponse и т. Д.
- Установите задание cron с вашими настройками синхронизации (или с помощью AT или запланированных задач в Windows), которое запускает curl http://localhost/your/mapped/url?param=value.
Вы можете добавить параметры, но просто добавив параметры в URL.
Скажите мне, что вы, ребята, думаете.
[Обновление] Теперь я использую команду runjob из django-extensions вместо curl.
Мой cron выглядит примерно так:
@hourly python /path/to/project/manage.py runjobs hourly
... и так далее ежедневно, ежемесячно и т. д. Вы также можете настроить его для запуска определенной работы.
Я нахожу это более управляемым и более чистым. Не требует сопоставления URL-адреса с представлением. Просто определите свой класс работы и crontab, и все готово.
Вы обязательно должны проверить Django-Q! Он не требует дополнительной настройки и, возможно, имеет все необходимое для решения любых производственных проблем коммерческих проектов.
Он активно развивается и очень хорошо интегрируется с django, django ORM, mongo, redis. Вот моя конфигурация:
# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
# Match recommended settings from docs.
'name': 'DjangoORM',
'workers': 4,
'queue_limit': 50,
'bulk': 10,
'orm': 'default',
# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,
# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,
# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,
# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,
# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,
# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
'sentry': RAVEN_CONFIG,
},
}
После части кода я могу написать что угодно, как и мои views.py:)
#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################
от http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/
Более современным решением (по сравнению с Celery) является Django Q: https://django-q.readthedocs.io/en/latest/index.html
Он имеет отличную документацию и легко прогуливается. Поддержка Windows отсутствует, поскольку Windows не поддерживает разветвление процессов. Но это прекрасно работает, если вы создаете свою среду разработки, используя подсистему Windows для Linux.
Да, метод выше, так здорово. И я попробовал некоторые из них. Наконец-то я нашел такой метод:
from threading import Timer
def sync():
do something...
sync_timer = Timer(self.interval, sync, ())
sync_timer.start()
Так же, как рекурсивный.
Хорошо, я надеюсь, что этот метод может удовлетворить ваши требования.:)
У меня было что-то похожее с твоей проблемой сегодня.
Я не хотел, чтобы он обрабатывался сервером через cron (и в конце концов большинство библиотек были просто помощниками cron).
Итак, я создал модуль планирования и прикрепил его к init.
Это не лучший подход, но он помогает мне хранить весь код в одном месте, а его выполнение связано с основным приложением.
Я использую сельдерей для создания своих периодических заданий. Сначала вам нужно установить его следующим образом:
pip install django-celery
Не забудьте зарегистрироваться django-celery
в ваших настройках, а затем вы можете сделать что-то вроде этого:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
#your code
Я не уверен, что это будет полезно для всех, так как мне пришлось предоставлять другим пользователям системы для планирования заданий, не предоставляя им доступ к фактическому планировщику задач сервера (windows), я создал это приложение многократного использования.
Обратите внимание, что пользователи имеют доступ к одной общей папке на сервере, где они могут создавать необходимые файлы command/task/.bat. Эта задача может быть запланирована с помощью этого приложения.
Имя приложения - https://github.com/just10minutes/Django_Windows_Scheduler
Другой вариант - простой планировщик . Он работает в фоновом режиме, поэтому может быть легко интегрирован с любым приложением. См. Примеры здесь .
Пример для колбы:
if __name__ == "__main__":
try:
scheduled_jobs.run()
application.run(host='0.0.0.0', port=5000)
except Exception as e:
# log them
Другой вариант, похожий на ответ Брайана Нила, использовать RunScripts.
Тогда вам не нужно настраивать команды. Преимущество этого заключается в более гибкой или чистой структуре папок.
Этот файл должен реализовать функцию run(). Это то, что вызывается при запуске скрипта. Вы можете импортировать любые модели или другие части вашего проекта django для использования в этих скриптах.
А потом, просто
python manage.py runscript path.to.script
Простой способ - написать собственную команду оболочки, см. Документацию Django и выполнить ее, используя cronjob в linux. Однако я очень рекомендую использовать брокера сообщений, как RabbitMQ в сочетании с сельдереем. Может быть, вы можете взглянуть на этот учебник
Одной из альтернатив является использование Rocketry:
from rocketry import Rocketry
from rocketry.conds import daily, after_success
app = Rocketry()
@app.task(daily.at("10:00"))
def do_daily():
...
@app.task(after_success(do_daily))
def do_after_another():
...
if __name__ == "__main__":
app.run()
Он также поддерживает пользовательские условия:
from pathlib import Path
@app.cond()
def file_exists(file):
return Path(file).exists()
@app.task(daily & file_exists("myfile.csv"))
def do_custom():
...
И он также поддерживает Cron:
from rocketry.conds import cron
@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
...
Его можно довольно хорошо интегрировать с FastAPI, и я думаю, что его можно интегрировать с Django, а Rocketry — это, по сути, просто сложный цикл, который может порождать асинхронные задачи, потоки и процессы.
Отказ от ответственности: я автор.
Для простых докеризованных проектов я не мог найти подходящий ответ.
Поэтому я написал очень скромное решение без необходимости использования внешних библиотек или триггеров, которое работает само по себе. Не требуется внешний os-cron, он должен работать в любой среде.
Это работает путем добавления промежуточного программного обеспечения: middleware.py
import threading
def should_run(name, seconds_interval):
from application.models import CronJob
from django.utils.timezone import now
try:
c = CronJob.objects.get(name=name)
except CronJob.DoesNotExist:
CronJob(name=name, last_ran=now()).save()
return True
if (now() - c.last_ran).total_seconds() >= seconds_interval:
c.last_ran = now()
c.save()
return True
return False
class CronTask:
def __init__(self, name, seconds_interval, function):
self.name = name
self.seconds_interval = seconds_interval
self.function = function
def cron_worker(*_):
if not should_run("main", 60):
return
# customize this part:
from application.models import Event
tasks = [
CronTask("events", 60 * 30, Event.clean_stale_objects),
# ...
]
for task in tasks:
if should_run(task.name, task.seconds_interval):
task.function()
def cron_middleware(get_response):
def middleware(request):
response = get_response(request)
threading.Thread(target=cron_worker).start()
return response
return middleware
models/cron.py
:
from django.db import models
class CronJob(models.Model):
name = models.CharField(max_length=10, primary_key=True)
last_ran = models.DateTimeField()
settings.py
:
MIDDLEWARE = [
...
'application.middleware.cron_middleware',
...
]
Если вы хотите что-то более надежное, чем Celery, попробуйте TaskHawk, который построен на основе AWS SQS / SNS.