Отдельные задачи для сельдерея и Django выполняются несколько раз

Я столкнулся с проблемой, когда я помещаю задачу в очередь, и она запускается несколько раз. Из журналов сельдерея я вижу, что тот же работник выполняет задачу...

[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success

Однако, когда я просматриваю фактические журналы приложения, он показывает 5-6 строк журнала для каждого шага (??).

Я использую Django 1.6 с RabbitMQ. Метод для помещения в очередь заключается в установке задержки на функцию.

Эта функция (добавляется декоратор задачи) (затем вызывается класс, который запускается.

Кто-нибудь есть идеи о том, как решить эту проблему?

Редактировать: По запросу здесь код,

views.py

На мой взгляд, я отправляю свои данные в очередь через...

from input.tasks import add_queue_project

add_queue_project.delay(data)

tasks.py

from celery.decorators import task

@task()
def add_queue_project(data):
    """ run project """
    logger = logging_setup(app="project")

    logger.info("starting project runner..")
    f = project_runner(data)
    f.main()

class project_runner():
    """ main project runner """

    def __init__(self,data):
        self.data = data
        self.logger = logging_setup(app="project")

    def self.main(self):
        .... Code

settings.py

THIRD_PARTY_APPS = (
    'south',  # Database migration helpers:
    'crispy_forms',  # Form layouts
    'rest_framework',
    'djcelery',
)

import djcelery
djcelery.setup_loader()

BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""

CELERY_IMPORTS = ("input.tasks", )

celeryd

Линия, в которой я бегу - это начать сельдерей,

python2.7 manage.py celeryd -l info

Спасибо,

1 ответ

У меня нет точного ответа для вас, но есть несколько вещей, на которые вы должны обратить внимание:

  • djcelery устарела, так что если вы используете новую версию celery может быть какой-то конфликт.

  • Если твой input приложение указано в INSTALLED_APPS сельдерей обнаружит это, так что вам не нужно добавлять его в CELERY_IMPORTS = ("input.tasks", ), что может быть причиной вашей проблемы, так как задачи могут быть загружены несколько раз

  • попробуйте дать название вашей задаче @task(name='input.tasks.add'), он будет знать, что это одна и та же задача, независимо от того, как вы ее импортируете.

Глядя на ваши настройки, вы видите, что вы используете старую версию сельдерея, или вы используете старую конфигурацию для новой версии сельдерея. В любом случае убедитесь, что у вас самая новая версия и попробуйте эту конфигурацию вместо того, что у вас есть:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Теперь вам также придется по-разному настраивать сельдерей:

Избавляться от djcelery вещи полностью

Создайте proj/celery.py внутри вашего проекта Django:

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

В вашем proj/__init__.py:

from __future__ import absolute_import

from proj.celery import app as celery_app

Тогда если ваш input приложение является повторно используемым приложением и не является частью вашего проекта @shared_task вместо @task декоратор.

Затем запустите сельдерей:

celery -A proj worker -l info

Надеюсь, поможет.

Другие вопросы по тегам