Сценарий Node.js (узел-сельдерей) вызов задачи сельдерея неправильно обрабатывает аргумент "себя"
Я создал скрипт задачи сельдерея следующим образом:
from celery import Task
from celery.contrib.methods import task
from celery.contrib.methods import task_method
from pipelines.addsub import settings
from pipelines.addsub.log import register_task_log
@register_task_log(__name__)
class AddTask(Task):
@task(filter=task_method, name='AddTask.get')
def get(self, x, y):
self.log.info("Calling task add(%d, %d)" % (x, y))
return x + y
Я определил следующие очереди и маршруты:
CELERY_QUEUES = {
'celery': {
'exchange': 'celery',
'binding_key': 'celery',
},
'addsub': {
'exchange': 'addsub',
'binding_key': 'addsub.operations',
},
}
CELERY_ROUTES = {
'AddTask.get': {
'queue': 'addsub',
'routing_key': 'addsub.operations',
},
}
Я начинаю сельдерея следующим образом:
celery -c 2 -A pipelines.celery.celery worker -Q addsub -E -l DEBUG --logfile=~/celery_workflows/addsubtasks/addsub.log
Я могу успешно запустить AddTask.get(1,3) из скорлупы сельдерея.
Затем я использовал модуль node-celery для запуска следующего скрипта node.js:
"use strict";
var celery = require('node-celery'),
client = celery.createClient({
CELERY_BROKER_URL: 'amqp://[user]:[password]@[hostname]:5672//prote.broker',
CELERY_RESULT_BACKEND: 'amqp',
CELERY_ROUTES: {'AddTask.get': {queue: 'addsub'}}
}),
get_addition = client.createTask('AddTask.get');
client.on('error', function (err) {
console.log(err);
});
client.on('connect', function () {
console.log('Connected ...');
get_addition.call([], {
x: 1,
y: 3
}); // sends a task to the addsub queue
});
Скрипт возвращает следующую ошибку:
2014-09-13 14:18:59,422: INFO/MainProcess] Received task: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d]
[2014-09-13 14:18:59,422: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fc407d5fde8> (args:(u'AddTask.get', u'261fb059-b88e-444b-b218-c3c24c94fc1d', [], {u'y': 3, u'x': 1}, {u'task': u'AddTask.get', u'group': None, u'is_eager': False, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': 'addsub', u'exchange': ''}, u'args': [], u'headers': {}, u'correlation_id': None, u'hostname': 'celery@pcs01', u'kwargs': {u'y': 3, u'x': 1}, u'reply_to': None, u'id': u'261fb059-b88e-444b-b218-c3c24c94fc1d'}) kwargs:{})
[2014-09-13 14:18:59,425: DEBUG/MainProcess] Task accepted: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] pid:6536
[2014-09-13 14:18:59,425: ERROR/MainProcess] Task AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] raised unexpected: TypeError('get() takes exactly 3 arguments (2 given)',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 437, in __protected_call__
return self.run(*args, **kwargs)
TypeError: get() takes exactly 3 arguments (2 given)
Сценарий действительно передает корректные параметры x: & y: работнику сельдерея, но аргумент self не обрабатывается должным образом. Кто-нибудь понимает, почему это может происходить?
Я успешно протестировал указанный выше скрипт node.js с помощью скрипта задачи, который определяет набор функций вместо класса с функциями-членами:
from pipelines.celery.celery import app
from pipelines.addsub import settings
from celery.utils.log import get_task_logger
log = get_task_logger(__name__)
@app.task(name='add')
def add(x, y):
log.info("Calling task add(%d, %d)" % (x, y))
return x + y
@app.task(name='subtract')
def subtract(x, y):
log.info("Calling task subtract(%d, %d)" % (x, y))
return x - y
Я предполагаю, что модуль celery.contrib.methods не работает в случае, который я описал выше. У кого-нибудь есть понимание этой проблемы?