Сельдерей add_periodic_task блокирует работу Django в среде uwsgi
Я написал модуль, который динамически добавляет периодические задачи сельдерея на основе списка словарей в настройках проекта (импортируется через django.conf.settings
). Я делаю это с помощью функции add_tasks
который планирует функцию для вызова с определенным uuid
что указано в настройках:
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
Как предложено здесь, я использую on_after_configure.connect
сигнал для вызова функции в моем celery.py
:
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
Эта установка отлично работает для обоих celery beat
а также celery worker
но нарушает мои настройки, где я использую uwsgi
чтобы служить моей заявке Django. Uwsgi
работает ровно до тех пор, пока код представления не отправит задачу с помощью сельдерея .delay()
метод. В этот момент кажется, что сельдерей инициализируется в uwsgi
но блоки навсегда в приведенном выше коде. Если я запускаю это вручную из командной строки, а затем прерываю, когда он блокируется, я получаю следующую (сокращенную) трассировку стека:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
Кажется, есть проблема с приобретением мьютекса.
В настоящее время я использую обходной путь, чтобы определить, sys.argv[0]
содержит uwsgi
а потом не добавлять периодические задачи, так как только beat
нужны задачи, но я хотел бы понять, что здесь происходит, чтобы решить проблему более постоянно.
Может ли эта проблема иметь какое-то отношение к использованию многопоточного или многопроцессорного использования uwsgi, где один поток / процесс содержит мьютекс, в котором нуждается другой?
Буду признателен за любые советы, которые могут помочь мне решить проблему. Спасибо.
Я использую: Django 1.11.7 и Celery 4.1.0
Редактировать 1
Я создал минимальную настройку для этой проблемы:
celery.py:
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py:
from celery import shared_task
@shared_task()
def my_task():
print('ran')
Убедитесь, что CELERY_TASK_ALWAYS_EAGER=False и что у вас есть рабочая очередь сообщений.
Бежать:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
Подождите около 10 секунд, прежде чем прерывать, чтобы увидеть вышеуказанную ошибку.
2 ответа
Итак, я обнаружил, что @shared_task
Декоратор создает проблему. Я могу обойти проблему, когда объявляю задачу прямо в функции, вызываемой сигналом следующим образом:
def add_tasks(celery):
@celery.task
def my_task(uuid):
print(uuid)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
Это решение на самом деле работает для меня, но у меня есть еще одна проблема с этим: я использую этот код в подключаемом приложении, поэтому я не могу получить прямой доступ к приложению celery вне обработчика сигнала, но хотел бы также иметь возможность вызова my_task
функция из другого кода. Определив его внутри функции, он недоступен за пределами функции, поэтому я не могу импортировать его где-либо еще.
Вероятно, я могу обойти это, определив функцию задания вне функции сигнала, и использовать ее с различными декораторами здесь и в tasks.py
, Мне интересно, если, кроме @shared_task
декоратор, который я могу использовать в tasks.py
это не создает проблемы.
Текущее лучшее решение может быть:
task_app.__init__.py:
def my_task(uuid):
# do stuff
print(uuid)
def add_tasks(celery):
celery_my_task = celery.task(my_task)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
celery_my_task(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
task_app.tasks.py:
from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)
myapp.celery.py:
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from task_app import add_tasks
add_tasks(sender)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Не могли бы вы попробовать этот сигнал @app.on_after_finalize.connect
:
какой-то быстрый фрагмент из рабочего проекта celery==4.1.0
, Django==2.0
, django-celery-beat==1.1.0
а также django-celery-results==1.0.1
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
""" setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify
based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE
"""
for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items():
sender.add_periodic_task(
task_config['schedule'],
fetch_shopify.s(**task_config['kwargs']['resource_name']),
name=task_name
)
кусок CELERY_BEAT_SCHEDULE
:
CELERY_BEAT_SCHEDULE = {
'fetch_shopify_orders': {
'task': 'shopify.tasks.fetch_shopify',
'schedule': crontab(hour="*/3", minute=0),
'kwargs': {
'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS
}
}
}