Сценарий Node.js (узел-сельдерей) вызов задачи сельдерея неправильно обрабатывает аргумент "себя"

Я создал скрипт задачи сельдерея следующим образом:

from celery import Task
from celery.contrib.methods import task
from celery.contrib.methods import task_method
from pipelines.addsub import settings
from pipelines.addsub.log import register_task_log


@register_task_log(__name__)
class AddTask(Task):

    @task(filter=task_method, name='AddTask.get')
    def get(self, x, y):
        self.log.info("Calling task add(%d, %d)" % (x, y))
        return x + y

Я определил следующие очереди и маршруты:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'addsub': {
        'exchange': 'addsub',
        'binding_key': 'addsub.operations',
    },
}

CELERY_ROUTES = {
    'AddTask.get': {
        'queue': 'addsub',
        'routing_key': 'addsub.operations',
    },
}

Я начинаю сельдерея следующим образом:

celery -c 2 -A pipelines.celery.celery worker -Q addsub -E -l DEBUG --logfile=~/celery_workflows/addsubtasks/addsub.log

Я могу успешно запустить AddTask.get(1,3) из скорлупы сельдерея.

Затем я использовал модуль node-celery для запуска следующего скрипта node.js:

"use strict";
var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://[user]:[password]@[hostname]:5672//prote.broker',
        CELERY_RESULT_BACKEND: 'amqp',
        CELERY_ROUTES: {'AddTask.get': {queue: 'addsub'}}
    }),
    get_addition = client.createTask('AddTask.get');

client.on('error', function (err) {
    console.log(err);
});

client.on('connect', function () {
    console.log('Connected ...');
    get_addition.call([], {
        x: 1,
        y: 3
    }); // sends a task to the addsub queue
});

Скрипт возвращает следующую ошибку:

2014-09-13 14:18:59,422: INFO/MainProcess] Received task: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d]
[2014-09-13 14:18:59,422: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fc407d5fde8> (args:(u'AddTask.get', u'261fb059-b88e-444b-b218-c3c24c94fc1d', [], {u'y': 3, u'x': 1}, {u'task': u'AddTask.get', u'group': None, u'is_eager': False, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': 'addsub', u'exchange': ''}, u'args': [], u'headers': {}, u'correlation_id': None, u'hostname': 'celery@pcs01', u'kwargs': {u'y': 3, u'x': 1}, u'reply_to': None, u'id': u'261fb059-b88e-444b-b218-c3c24c94fc1d'}) kwargs:{})
[2014-09-13 14:18:59,425: DEBUG/MainProcess] Task accepted: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] pid:6536
[2014-09-13 14:18:59,425: ERROR/MainProcess] Task AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] raised unexpected: TypeError('get() takes exactly 3 arguments (2 given)',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: get() takes exactly 3 arguments (2 given)

Сценарий действительно передает корректные параметры x: & y: работнику сельдерея, но аргумент self не обрабатывается должным образом. Кто-нибудь понимает, почему это может происходить?

Я успешно протестировал указанный выше скрипт node.js с помощью скрипта задачи, который определяет набор функций вместо класса с функциями-членами:

from pipelines.celery.celery import app
from pipelines.addsub import settings
from celery.utils.log import get_task_logger


log = get_task_logger(__name__)


@app.task(name='add')
def add(x, y):
    log.info("Calling task add(%d, %d)" % (x, y))
    return x + y


@app.task(name='subtract')
def subtract(x, y):
    log.info("Calling task subtract(%d, %d)" % (x, y))
    return x - y

Я предполагаю, что модуль celery.contrib.methods не работает в случае, который я описал выше. У кого-нибудь есть понимание этой проблемы?

0 ответов

Другие вопросы по тегам