Сельдерей "Начало работы" не может получить результаты; всегда в ожидании
Я пытался следовать инструкциям " Первые шаги в сельдерее" с руководством " Сельдерей" и " Следующие шаги". Моя установка - Windows 7 64-битная, Anaconda Python 2.7 (32-битная), установленные 32-битные бинарные файлы Erlang, сервер RabbitMQ и сельдерей (с pip install celery
).
Следуя этому руководству, я создал папку proj с init.py, tasks.py и celery.py. Мой init.py пуст. Вот celery.py:
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
#Optional configuration, see the application user guide
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
)
if __name__ == '__main__':
app.start()
И вот задачи.
from __future__ import absolute_import
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
Сначала я понимаю, что должен убедиться, что служба RabbitMQ запущена. Вкладка "Службы" диспетчера задач показывает, что RabbitMQ действительно работает. Чтобы запустить сервер сельдерея и загрузить мои задачи, я открываю cmd.exe, перейдите к родителю proj
(папка, которую я назвал celery_demo), и запустите это:
celery -A proj.celery worker -l debug
который дает этот вывод:
C:\Users\bnables\Documents\Python\celery_demo>celery -A proj.celery worker -l debug
[2014-08-25 17:00:09,308: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-08-25 17:00:09,313: DEBUG/MainProcess] | Worker: Building graph...
[2014-08-25 17:00:09,315: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoreloader, Autoscaler, StateDB, Beat, Con
sumer}
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Building graph...
[2014-08-25 17:00:09,332: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Tasks, Control, Agent, event loop
}
-------------- celery@MSSLW40013047 v3.1.13 (Cipater)
---- **** -----
--- * *** * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x3290370
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2014-08-25 17:00:09,345: DEBUG/MainProcess] | Worker: Starting Pool
[2014-08-25 17:00:09,417: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,420: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-08-25 17:00:09,421: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-08-25 17:00:09,457: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,460: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,460: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2014-08-25 17:00:09,461: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,461: DEBUG/MainProcess] | Consumer: Starting Events
[2014-08-25 17:00:09,516: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,519: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,520: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,522: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:09,523: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,523: DEBUG/MainProcess] | Consumer: Starting Heart
[2014-08-25 17:00:09,530: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,533: DEBUG/MainProcess] | Consumer: Starting Mingle
[2014-08-25 17:00:09,538: INFO/MainProcess] mingle: searching for neighbors
[2014-08-25 17:00:09,539: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,540: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,552: INFO/MainProcess] mingle: all alone
[2014-08-25 17:00:10,552: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,552: DEBUG/MainProcess] | Consumer: Starting Gossip
[2014-08-25 17:00:10,553: DEBUG/MainProcess] using channel_id: 2
[2014-08-25 17:00:10,555: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,559: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,559: DEBUG/MainProcess] | Consumer: Starting Tasks
[2014-08-25 17:00:10,566: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,566: DEBUG/MainProcess] | Consumer: Starting Control
[2014-08-25 17:00:10,568: DEBUG/MainProcess] using channel_id: 3
[2014-08-25 17:00:10,569: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,572: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,573: DEBUG/MainProcess] | Consumer: Starting event loop
[2014-08-25 17:00:10,575: WARNING/MainProcess] celery@MSSLW40013047 ready.
[2014-08-25 17:00:10,575: DEBUG/MainProcess] basic.qos: prefetch_count->32
-A
говорит сельдерею, где найти мой экземпляр приложения сельдерея. Используя только proj
тоже работает, но так как потом будет просто искать proj.celery
Быть более явным хорошо здесь. worker
это команда, данная сельдерею, которая заставляет некоторых рабочих выполнять задачи, загружаемые из proj.celery. в заключение -l debug
говорит сельдерею установить уровень журнала для отладки, чтобы я получал много информации. Это обычно будет -l info
,
Чтобы протестировать мой сервер задач, я открываю консоль IPython Qt и перехожу к celery_demo
папка (которая содержит proj
). Я тогда набираю from proj.tasks import add
, Просто вызывая add(1, 2)
возвращается 3
без использования сервера, как и ожидалось. Когда я вызываю add.delay, вот что происходит:
add.delay(2, 3)
Который возвращает:
<AsyncResult: 42123ff3-e94e-4673-808a-ec6c847679d8>
И в моем окне cmd.exe я получаю:
[2014-08-25 17:20:38,109: INFO/MainProcess] Received task: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8]
[2014-08-25 17:20:38,109: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x033CD6F0> (args:(u'proj.tasks.add', u'42123ff3-e94e-4673-
808a-ec6c847679d8', [2, 3], {}, {u'timelimit': [None, None], u'utc': True, u'is_eager': False, u'chord': None, u'group': None, u'args': [2, 3], u'retr
ies': 0, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'celery', u'exchange': u'celery'}, u'expires': None, u'hostname
': 'celery@MSSLW40013047', u'task': u'proj.tasks.add', u'callbacks': None, u'correlation_id': u'42123ff3-e94e-4673-808a-ec6c847679d8', u'errbacks': No
ne, u'reply_to': u'70ed001d-193c-319c-9447-8d77c231dc10', u'taskset': None, u'kwargs': {}, u'eta': None, u'id': u'42123ff3-e94e-4673-808a-ec6c847679d8
', u'headers': {}}) kwargs:{})
[2014-08-25 17:20:38,124: DEBUG/MainProcess] Task accepted: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] pid:4052
[2014-08-25 17:20:38,125: INFO/MainProcess] Task proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] succeeded in 0.0130000114441s: 5
Итак, как показывает последняя строка, вычисляется результат 5. Далее я хочу сохранить объект AsyncResult, проверить его состояние и получить значение результата:
result = add.delay(3, 4)
Однако result.state и result.get(timeout=1) не работают должным образом:
In: result.state
Out: 'Pending'
In: result.status
Out: 'Pending'
In: result.get(timeout=1)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-17-375f2d3530cb> in <module>()
----> 1 result.get(timeout=1)
C:\Anaconda32\lib\site-packages\celery\result.pyc in get(self, timeout, propagate, interval, no_ack, follow_parents)
167 interval=interval,
168 on_interval=on_interval,
--> 169 no_ack=no_ack,
170 )
171 finally:
C:\Anaconda32\lib\site-packages\celery\backends\amqp.pyc in wait_for(self, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPAGATE_STATES, **kwargs)
155 on_interval=on_interval)
156 except socket.timeout:
--> 157 raise TimeoutError('The operation timed out.')
158
159 if meta['status'] in PROPAGATE_STATES and propagate:
TimeoutError: The operation timed out.
Где ожидаемый результат result.state или result.status равен 'SUCCESSFUL', а результат result.get(timeout=1)
должно быть 5
,
Похоже, что хранилище результатов или передача сообщений работают неправильно. Учебник просто говорит, что установка backend
именованный параметр в вызове Celery()
или CELERY_RESULT_BACKEND
настройка конфига. В "начале" это имеет backend='amqp'
где "Следующие шаги" имеет backend='amqp://'
который также используется в примерах github.
Я бился головой об этом какое-то время, не зная, как поступить. Есть идеи, что попробовать дальше? Спасибо!
3 ответа
- W8 x64
- Python 2.7.3 (ActivePython)
- Эрланг 17,1 х64
- Сервер RabbitMQ 3.3.5
- Сельдерей 3.1.13
Случайно перестал работать тоже. Точно такая же проблема - навсегда в ожидании. Переустановка Erlang или RabbitMQ не помогла.
Я также тестировал Debian Linux 7 x86, и здесь он работает без проблем.
Также: https://github.com/celery/celery/issues/2146
Это, вероятно, проблема, связанная с Windows, и установка рабочего флага --pool=solo исправила его для меня.
Вы должны добавить track_started=True
, Ну, трудно знать этот вариант.
Значением по умолчанию является False, поэтому вы всегда получите PENDING, когда задача не будет завершена. Кроме того, вы должны неправильно настроить бэкэнд или брокера. Пожалуйста, проверьте их снова.
@app.task(bind=True, track_started=True)
def add(x, y):
return x + y
Методы.delay ставят вашу задачу в очередь. "result.state" показывает ожидающий, что означает, что ваша задача еще не выполнена. Там может быть много задач, поэтому его откладывают.
Проверьте, выполняются ли какие-либо задачи
>>> from celery.task.control import inspect
>>> i = inspect()
>>> i.scheduled()
>>> i.active()