Сельдерей add_periodic_task блокирует работу Django в среде uwsgi

Я написал модуль, который динамически добавляет периодические задачи сельдерея на основе списка словарей в настройках проекта (импортируется через django.conf.settings). Я делаю это с помощью функции add_tasks который планирует функцию для вызова с определенным uuid что указано в настройках:

def add_tasks(celery):
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

Как предложено здесь, я использую on_after_configure.connect сигнал для вызова функции в моем celery.py:

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
    from add_tasks_module import add_tasks
    add_tasks(celery)

Эта установка отлично работает для обоих celery beat а также celery worker но нарушает мои настройки, где я использую uwsgi чтобы служить моей заявке Django. Uwsgi работает ровно до тех пор, пока код представления не отправит задачу с помощью сельдерея .delay() метод. В этот момент кажется, что сельдерей инициализируется в uwsgi но блоки навсегда в приведенном выше коде. Если я запускаю это вручную из командной строки, а затем прерываю, когда он блокируется, я получаю следующую (сокращенную) трассировку стека:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:
Traceback (most recent call last):

  (SHORTENED HERE. Just contained the trace from the console through my call to this function)

  File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
    my_task.s(new_task['uuid']),
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
    self.finalize(auto=True)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
    with self._finalize_mutex:

Кажется, есть проблема с приобретением мьютекса.

В настоящее время я использую обходной путь, чтобы определить, sys.argv[0] содержит uwsgi а потом не добавлять периодические задачи, так как только beat нужны задачи, но я хотел бы понять, что здесь происходит, чтобы решить проблему более постоянно.

Может ли эта проблема иметь какое-то отношение к использованию многопоточного или многопроцессорного использования uwsgi, где один поток / процесс содержит мьютекс, в котором нуждается другой?

Буду признателен за любые советы, которые могут помочь мне решить проблему. Спасибо.

Я использую: Django 1.11.7 и Celery 4.1.0

Редактировать 1

Я создал минимальную настройку для этой проблемы:

celery.py:

import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        60,
        my_task.s(),
        name='Testtask'
    )

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py:

from celery import shared_task
@shared_task()
def my_task():
    print('ran')

Убедитесь, что CELERY_TASK_ALWAYS_EAGER=False и что у вас есть рабочая очередь сообщений.

Бежать:

./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'

Подождите около 10 секунд, прежде чем прерывать, чтобы увидеть вышеуказанную ошибку.

2 ответа

Решение

Итак, я обнаружил, что @shared_task Декоратор создает проблему. Я могу обойти проблему, когда объявляю задачу прямо в функции, вызываемой сигналом следующим образом:

def add_tasks(celery):
    @celery.task
    def my_task(uuid):
        print(uuid)

    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

Это решение на самом деле работает для меня, но у меня есть еще одна проблема с этим: я использую этот код в подключаемом приложении, поэтому я не могу получить прямой доступ к приложению celery вне обработчика сигнала, но хотел бы также иметь возможность вызова my_task функция из другого кода. Определив его внутри функции, он недоступен за пределами функции, поэтому я не могу импортировать его где-либо еще.

Вероятно, я могу обойти это, определив функцию задания вне функции сигнала, и использовать ее с различными декораторами здесь и в tasks.py, Мне интересно, если, кроме @shared_task декоратор, который я могу использовать в tasks.py это не создает проблемы.

Текущее лучшее решение может быть:

task_app.__init__.py:

def my_task(uuid):
    # do stuff
    print(uuid)

def add_tasks(celery):
    celery_my_task = celery.task(my_task)
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            celery_my_task(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

task_app.tasks.py:

from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)

myapp.celery.py:

import os
from celery import Celery
from django.conf import settings


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from task_app import add_tasks
    add_tasks(sender)


app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Не могли бы вы попробовать этот сигнал @app.on_after_finalize.connect:

какой-то быстрый фрагмент из рабочего проекта celery==4.1.0, Django==2.0, django-celery-beat==1.1.0 а также django-celery-results==1.0.1

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    """ setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify
    based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE
    """
    for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items():
        sender.add_periodic_task(
            task_config['schedule'],
            fetch_shopify.s(**task_config['kwargs']['resource_name']),
            name=task_name
        )

кусок CELERY_BEAT_SCHEDULE:

CELERY_BEAT_SCHEDULE = {
    'fetch_shopify_orders': {
        'task': 'shopify.tasks.fetch_shopify',
        'schedule': crontab(hour="*/3", minute=0),
        'kwargs': {
            'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS
        }
    }
}
Другие вопросы по тегам