Ошибка рабочего Django / Celery / Kombu: получено и удалено неизвестное сообщение. Неправильный пункт назначения?

Кажется, что сообщения не попадают в очередь должным образом.

Я использую Django с Celery и Kombu, чтобы использовать собственную базу данных Django в качестве бэкенда. Все, что мне нужно, это очень простая настройка Pub / Sub. В конечном итоге он будет развернут в Heroku, поэтому я использую мастера для локального запуска. Вот соответствующий код и информация:

заморозка

Django==1.4.2
celery==3.0.15
django-celery==3.0.11
kombu==2.5.6

PROCFILE

web: source bin/activate; python manage.py run_gunicorn -b 0.0.0.0:$PORT -w 4; python manage.py syncdb
celeryd: python manage.py celeryd -E -B --loglevel=INFO

settings.py

# Celery configuration
import djcelery
CELERY_IMPORTS = ("api.tasks",)
BROKER_URL = "django://localhost//"
djcelery.setup_loader()

put_message

with Connection(settings.BROKER_URL) as conn:
  queue = conn.SimpleQueue('celery')
  queue.put(id)
  queue.close()

апи / tasks.py

@task()
def process_next_task():
  with Connection(settings.BROKER_URL) as conn:
    queue = conn.SimpleQueue('celery')
    message = queue.get(block=True, timeout=1)
    id = int(message.payload)
    try:
      Model.objects.get(id=id)
    except Model.DoesNotExist:
      message.reject()
    else:
      # Do stuff here
      message.ack()
    queue.close()

В терминале foreman start работает просто отлично и показывает это:

started with pid 31835
17:08:22 celeryd.1 | started with pid 31836
17:08:22 web.1     | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found
17:08:22 web.1     | 2013-02-14 17:08:22 [31838] [INFO] Starting gunicorn 0.16.1
17:08:22 web.1     | 2013-02-14 17:08:22 [31838] [INFO] Listening at: http://0.0.0.0:5000 (31838)
17:08:22 web.1     | 2013-02-14 17:08:22 [31838] [INFO] Using worker: sync
17:08:22 web.1     | 2013-02-14 17:08:22 [31843] [INFO] Booting worker with pid: 31843
17:08:22 web.1     | 2013-02-14 17:08:22 [31844] [INFO] Booting worker with pid: 31844
17:08:22 web.1     | 2013-02-14 17:08:22 [31845] [INFO] Booting worker with pid: 31845
17:08:22 web.1     | 2013-02-14 17:08:22 [31846] [INFO] Booting worker with pid: 31846
17:08:22 celeryd.1 | [2013-02-14 17:08:22,858: INFO/Beat] Celerybeat: Starting...
17:08:22 celeryd.1 | [2013-02-14 17:08:22,870: WARNING/MainProcess] celery@myhost.local ready.
17:08:22 celeryd.1 | [2013-02-14 17:08:22,873: INFO/MainProcess] consumer: Connected to django://localhost//.
17:08:42 celeryd.1 | [2013-02-14 17:08:42,926: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
17:08:42 celeryd.1 | The full contents of the message body was: body: 25 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}}

Эти последние две строки не отображаются сразу, но отображаются, когда мой API получает запрос POST, который запускает код в разделе put_message выше. Я экспериментировал с использованием полностью продвинутых классов Kombu Producer и Consumer с тем же результатом.

Пример SimpleQueue Комбу: http://kombu.readthedocs.org/en/latest/userguide/examples.html
Документы по сельдерею: http://docs.celeryproject.org/en/latest/index.html

Есть идеи?

РЕДАКТИРОВАНИЕ

Изменение на --loglevel=DEBUG в рамках procfile изменяет вывод терминала на следующее:

08:54:33 celeryd.1 | started with pid 555
08:54:33 web.1     | started with pid 554
08:54:33 web.1     | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found
08:54:36 web.1     | 2013-02-15 08:54:36 [557] [INFO] Starting gunicorn 0.16.1
08:54:36 web.1     | 2013-02-15 08:54:36 [557] [INFO] Listening at: http://0.0.0.0:5000 (557)
08:54:36 web.1     | 2013-02-15 08:54:36 [557] [INFO] Using worker: sync
08:54:36 web.1     | 2013-02-15 08:54:36 [564] [INFO] Booting worker with pid: 564
08:54:36 web.1     | 2013-02-15 08:54:36 [565] [INFO] Booting worker with pid: 565
08:54:36 web.1     | 2013-02-15 08:54:36 [566] [INFO] Booting worker with pid: 566
08:54:36 web.1     | 2013-02-15 08:54:36 [567] [INFO] Booting worker with pid: 567
08:54:37 celeryd.1 | [2013-02-15 08:54:37,480: DEBUG/MainProcess] [Worker] Loading modules.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Claiming components.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Building boot step graph.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] New boot order: {ev, queues, beat, pool, mediator, autoreloader, timers, state-db, autoscaler, consumer}
08:54:37 celeryd.1 | [2013-02-15 08:54:37,489: DEBUG/MainProcess] Starting celery.beat._Process...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,490: DEBUG/MainProcess] celery.beat._Process OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: DEBUG/MainProcess] Starting celery.concurrency.processes.TaskPool...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: INFO/Beat] Celerybeat: Starting...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,506: DEBUG/MainProcess] celery.concurrency.processes.TaskPool OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.mediator.Mediator...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] celery.worker.mediator.Mediator OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.consumer.BlockingConsumer...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: WARNING/MainProcess] celery@myhost.local ready.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: DEBUG/MainProcess] consumer: Re-establishing connection to the broker...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,510: INFO/MainProcess] consumer: Connected to django://localhost//.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,628: DEBUG/Beat] Current schedule:
08:54:37 celeryd.1 | <Entry: celery.backend_cleanup celery.backend_cleanup() {<crontab: * 4 * * * (m/h/d/dM/MY)>}
08:54:37 celeryd.1 | [2013-02-15 08:54:37,629: DEBUG/Beat] Celerybeat: Ticking with max interval->5.00 minutes
08:54:37 celeryd.1 | [2013-02-15 08:54:37,658: DEBUG/Beat] Celerybeat: Waking up in 5.00 minutes.
08:54:38 celeryd.1 | [2013-02-15 08:54:38,110: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->16
08:54:38 celeryd.1 | [2013-02-15 08:54:38,126: DEBUG/MainProcess] consumer: Ready to accept tasks!
08:55:08 celeryd.1 | [2013-02-15 08:55:08,184: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
08:55:08 celeryd.1 | The full contents of the message body was: body: 26 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}}

5 ответов

Проблема была двоякой:

Формат сообщения был неверным. Это должен быть словарь в соответствии с документацией по адресу http://docs.celeryproject.org/en/latest/internals/protocol.html которую предоставил @asksol, и следуя примеру, приведенному в нижней части этой страницы.

Пример сообщения

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

put_message

with Connection(settings.BROKER_URL) as conn:
  queue = conn.SimpleQueue('celery')
  message = {
    'task': 'process-next-task',
    'id': str(uuid.uuid4()),
    'args': [id],
    "kwargs": {},
    "retries": 0,
    "eta": str(datetime.datetime.now())
  }
  queue.put(message)
  queue.close()

Процесс Procfile является потребителем, который выполняет задачу, поэтому нет необходимости настраивать потребителя в рамках задачи. Мне просто нужно было использовать параметры, которые я отправил, когда опубликовал сообщение.

апи / tasks.py

@task(serializer='json', name='process-next-task')
def process_next_task(id):
  try:
    Model.objects.get(id=int(id))
  except Model.DoesNotExist:
    pass
  else:
    # Do stuff here

Это не решение для этого вопроса,
но пометка для выпуска в использовании celery4.0.2

вывод как:

[2017-02-09 17:45:12,136: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: [[], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': None}] (77b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'test2', 'delivery_tag': 1L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '3f6295b3-c85c-4188-b424-d186da7e2edb', 'N\xfd\x17=\x00\x00': 'gen23043@hy-ts-bf-01', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'celeryserver.tasks.test', '\xae\xbf': '3f6295b3-c85c-4188-b424-d186da7e2edb', '\x11s\x1f\xd8\x00\x00\x00\x00': '()', 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}

решение: https://github.com/celery/celery/issues/3675

# call this command many times, until it says it's not installed
pip uninstall librabbitmq

Спасибо за https://github.com/ask

Очевидно проблема librabbitmq связана с новым протоколом по умолчанию в сельдерее 4.x. Вы можете переключиться на предыдущую версию протокола, нажав CELERY_TASK_PROTOCOL = 1 в ваших настройках, если вы используете Django или настройки app.conf.task_protocol = 1 в celeryconf.py

Тогда вы сможете поставить задачу в очередь с другой задачей.

мой: [сельдерей 3.1.25; Джанго =1.11]

ADD celery exchange in settings.py

CELERY_QUEUES = {
   "celery":       {"exchange": "celery",
                    "routing_key": "celery"}
}

ИЛИ использовать этим

# I declare queue 
ch = settings.CELERY_APP.connection().channel()
ex = Exchange("implicit", channel=ch)
q = Queue(name="implicit", routing_key="implicit", channel=ch, exchange=ex)
q.declare()   # <-- here
producer = ch.Producer(routing_key=q.routing_key, exchange=q.exchange)
# publish
producer.publish("text")

или U может использовать вторую версию
из комбу документов

from kombu import Exchange, Queue
task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')

producer.publish(
    {'hello': 'world'},
    retry=True,
    exchange=task_queue.exchange,
    routing_key=task_queue.routing_key,
    declare=[task_queue], # <-- declares exchange, queue and binds.
)

Для будущих желающих,

В теле задачи необходимо присвоить значения «task» и «id». Так как исходный код потребителей написан так:

http://www.pythondoc.com/celery-3.1.11/_modules/celery/worker/consumer.html

         def on_task_received(body, message):
        try:
            name = body['task']
        except (KeyError, TypeError):
            return on_unknown_message(body, message)

        try:
            strategies[name](message, body,
                             message.ack_log_error,
                             message.reject_log_error,
                             callbacks)
        except KeyError as exc:
            on_unknown_task(body, message, exc)
        except InvalidTaskError as exc:
            on_invalid_task(body, message, exc)
Другие вопросы по тегам