Сельдерей "Начало работы" не может получить результаты; всегда в ожидании

Я пытался следовать инструкциям " Первые шаги в сельдерее" с руководством " Сельдерей" и " Следующие шаги". Моя установка - Windows 7 64-битная, Anaconda Python 2.7 (32-битная), установленные 32-битные бинарные файлы Erlang, сервер RabbitMQ и сельдерей (с pip install celery).

Следуя этому руководству, я создал папку proj с init.py, tasks.py и celery.py. Мой init.py пуст. Вот celery.py:

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

#Optional configuration, see the application user guide
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
)

if __name__ == '__main__':
    app.start()

И вот задачи.

from __future__ import absolute_import

from .celery import app

@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

Сначала я понимаю, что должен убедиться, что служба RabbitMQ запущена. Вкладка "Службы" диспетчера задач показывает, что RabbitMQ действительно работает. Чтобы запустить сервер сельдерея и загрузить мои задачи, я открываю cmd.exe, перейдите к родителю proj (папка, которую я назвал celery_demo), и запустите это:

celery -A proj.celery worker -l debug

который дает этот вывод:

C:\Users\bnables\Documents\Python\celery_demo>celery -A proj.celery worker -l debug
[2014-08-25 17:00:09,308: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-08-25 17:00:09,313: DEBUG/MainProcess] | Worker: Building graph...
[2014-08-25 17:00:09,315: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoreloader, Autoscaler, StateDB, Beat, Con
sumer}
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-08-25 17:00:09,322: DEBUG/MainProcess] | Consumer: Building graph...
[2014-08-25 17:00:09,332: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Gossip, Tasks, Control, Agent, event loop
}

 -------------- celery@MSSLW40013047 v3.1.13 (Cipater)
---- **** -----
--- * ***  * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x3290370
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2014-08-25 17:00:09,345: DEBUG/MainProcess] | Worker: Starting Pool
[2014-08-25 17:00:09,417: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,420: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-08-25 17:00:09,421: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-08-25 17:00:09,457: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,460: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,460: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2014-08-25 17:00:09,461: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,461: DEBUG/MainProcess] | Consumer: Starting Events
[2014-08-25 17:00:09,516: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.r
abbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2014 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': Tru
e, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u
'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@MSSLW40013047.ndc.nasa.gov', u'platform': u'Erlang/OTP', u'ver
sion': u'3.3.5'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2014-08-25 17:00:09,519: DEBUG/MainProcess] Open OK!
[2014-08-25 17:00:09,520: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,522: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:09,523: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,523: DEBUG/MainProcess] | Consumer: Starting Heart
[2014-08-25 17:00:09,530: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:09,533: DEBUG/MainProcess] | Consumer: Starting Mingle
[2014-08-25 17:00:09,538: INFO/MainProcess] mingle: searching for neighbors
[2014-08-25 17:00:09,539: DEBUG/MainProcess] using channel_id: 1
[2014-08-25 17:00:09,540: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,552: INFO/MainProcess] mingle: all alone
[2014-08-25 17:00:10,552: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,552: DEBUG/MainProcess] | Consumer: Starting Gossip
[2014-08-25 17:00:10,553: DEBUG/MainProcess] using channel_id: 2
[2014-08-25 17:00:10,555: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,559: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,559: DEBUG/MainProcess] | Consumer: Starting Tasks
[2014-08-25 17:00:10,566: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,566: DEBUG/MainProcess] | Consumer: Starting Control
[2014-08-25 17:00:10,568: DEBUG/MainProcess] using channel_id: 3
[2014-08-25 17:00:10,569: DEBUG/MainProcess] Channel open
[2014-08-25 17:00:10,572: DEBUG/MainProcess] ^-- substep ok
[2014-08-25 17:00:10,573: DEBUG/MainProcess] | Consumer: Starting event loop
[2014-08-25 17:00:10,575: WARNING/MainProcess] celery@MSSLW40013047 ready.
[2014-08-25 17:00:10,575: DEBUG/MainProcess] basic.qos: prefetch_count->32

-A говорит сельдерею, где найти мой экземпляр приложения сельдерея. Используя только proj тоже работает, но так как потом будет просто искать proj.celery Быть более явным хорошо здесь. worker это команда, данная сельдерею, которая заставляет некоторых рабочих выполнять задачи, загружаемые из proj.celery. в заключение -l debug говорит сельдерею установить уровень журнала для отладки, чтобы я получал много информации. Это обычно будет -l info,

Чтобы протестировать мой сервер задач, я открываю консоль IPython Qt и перехожу к celery_demo папка (которая содержит proj). Я тогда набираю from proj.tasks import add, Просто вызывая add(1, 2) возвращается 3 без использования сервера, как и ожидалось. Когда я вызываю add.delay, вот что происходит:

add.delay(2, 3)

Который возвращает:

<AsyncResult: 42123ff3-e94e-4673-808a-ec6c847679d8>

И в моем окне cmd.exe я получаю:

[2014-08-25 17:20:38,109: INFO/MainProcess] Received task: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8]
[2014-08-25 17:20:38,109: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x033CD6F0> (args:(u'proj.tasks.add', u'42123ff3-e94e-4673-
808a-ec6c847679d8', [2, 3], {}, {u'timelimit': [None, None], u'utc': True, u'is_eager': False, u'chord': None, u'group': None, u'args': [2, 3], u'retr
ies': 0, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'celery', u'exchange': u'celery'}, u'expires': None, u'hostname
': 'celery@MSSLW40013047', u'task': u'proj.tasks.add', u'callbacks': None, u'correlation_id': u'42123ff3-e94e-4673-808a-ec6c847679d8', u'errbacks': No
ne, u'reply_to': u'70ed001d-193c-319c-9447-8d77c231dc10', u'taskset': None, u'kwargs': {}, u'eta': None, u'id': u'42123ff3-e94e-4673-808a-ec6c847679d8
', u'headers': {}}) kwargs:{})
[2014-08-25 17:20:38,124: DEBUG/MainProcess] Task accepted: proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] pid:4052
[2014-08-25 17:20:38,125: INFO/MainProcess] Task proj.tasks.add[42123ff3-e94e-4673-808a-ec6c847679d8] succeeded in 0.0130000114441s: 5

Итак, как показывает последняя строка, вычисляется результат 5. Далее я хочу сохранить объект AsyncResult, проверить его состояние и получить значение результата:

result = add.delay(3, 4)

Однако result.state и result.get(timeout=1) не работают должным образом:

In:  result.state
Out: 'Pending'
In:  result.status
Out: 'Pending'

In: result.get(timeout=1)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
<ipython-input-17-375f2d3530cb> in <module>()
----> 1 result.get(timeout=1)

C:\Anaconda32\lib\site-packages\celery\result.pyc in get(self, timeout, propagate, interval, no_ack, follow_parents)
    167                 interval=interval,
    168                 on_interval=on_interval,
--> 169                 no_ack=no_ack,
    170             )
    171         finally:

C:\Anaconda32\lib\site-packages\celery\backends\amqp.pyc in wait_for(self, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPAGATE_STATES, **kwargs)
    155                                     on_interval=on_interval)
    156             except socket.timeout:
--> 157                 raise TimeoutError('The operation timed out.')
    158 
    159         if meta['status'] in PROPAGATE_STATES and propagate:

TimeoutError: The operation timed out.

Где ожидаемый результат result.state или result.status равен 'SUCCESSFUL', а результат result.get(timeout=1) должно быть 5,

Похоже, что хранилище результатов или передача сообщений работают неправильно. Учебник просто говорит, что установка backend именованный параметр в вызове Celery() или CELERY_RESULT_BACKEND настройка конфига. В "начале" это имеет backend='amqp' где "Следующие шаги" имеет backend='amqp://' который также используется в примерах github.

Я бился головой об этом какое-то время, не зная, как поступить. Есть идеи, что попробовать дальше? Спасибо!

3 ответа

Решение
  • W8 x64
  • Python 2.7.3 (ActivePython)
  • Эрланг 17,1 х64
  • Сервер RabbitMQ 3.3.5
  • Сельдерей 3.1.13

Случайно перестал работать тоже. Точно такая же проблема - навсегда в ожидании. Переустановка Erlang или RabbitMQ не помогла.

Я также тестировал Debian Linux 7 x86, и здесь он работает без проблем.

Также: https://github.com/celery/celery/issues/2146

Это, вероятно, проблема, связанная с Windows, и установка рабочего флага --pool=solo исправила его для меня.

Вы должны добавить track_started=True, Ну, трудно знать этот вариант.

Значением по умолчанию является False, поэтому вы всегда получите PENDING, когда задача не будет завершена. Кроме того, вы должны неправильно настроить бэкэнд или брокера. Пожалуйста, проверьте их снова.

@app.task(bind=True, track_started=True)
def add(x, y):
    return x + y

Методы.delay ставят вашу задачу в очередь. "result.state" показывает ожидающий, что означает, что ваша задача еще не выполнена. Там может быть много задач, поэтому его откладывают.

Проверьте, выполняются ли какие-либо задачи

>>> from celery.task.control import inspect
>>> i = inspect()
>>> i.scheduled()
>>> i.active()
Другие вопросы по тегам