Задача трансляции сельдерея не работает

Я пытался выполнить трансляцию, но только один из моих работников получал ее за каждый звонок. Не могли бы вы помочь мне? (Я использую rabbitmq и node-celery)

default_exchange = Exchange('celery', type='direct')
celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES = (
        Queue('celery', default_exchange, routing_key='celery'),
        Broadcast('broadcast_tasks'),
    ),
    CELERY_ROUTES = (
        {'my_tasks.sample_broadcast_task': {
            'queue': 'broadcast_tasks',
            }},
        {'my_tasks.sample_normal_task': {
            'queue': 'celery',
            'exchange': 'celery',
            'exchange_type': 'direct',
            'routing_key': 'celery',
            }}
    ),
    )

Я также протестировал следующую конфигурацию, но не работал.

celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES=(
        Queue('celery', Exchange('celery'), routing_key='celery'),
        Broadcast('broadcast'),
    ),
    )
@celery.task(ignore_result=True, queue='broadcast',
             options=dict(queue='broadcast'))
def sample_broadcast_task():
    print "test"

РЕДАКТИРОВАТЬ

после изменения способа запуска работника, добавив -Q трансляцию, теперь я сталкиваюсь с этой ошибкой:

PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'broadcast' in vhost '/': received 'direct' but current is 'fanout'

3 ответа

Попробовав много-много-много вещей, я наконец-то нашел решение. Это работа для меня. (сельдерей 3.1.24 (Cipater) и Python 2.7.12)

РАБОТНИК - tasks.py:

from celery import Celery
import celery_config
from kombu.common import Broadcast, Queue, Exchange

app = Celery()
app.config_from_object(sysadmin_celery_config)

@app.task
def print_prout(x):
    print x
    return x

РАБОЧИЙ - celery_config.py:

# coding=utf-8
from kombu.common import Broadcast, Queue, Exchange


BROKER_URL = 'amqp://login:pass@172.17.0.1//'
CELERY_RESULT_BACKEND = 'redis://:login@172.17.0.1'


CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True

CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ALWAYS_EAGER = False

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

Работник поссорился с:

celery -A celery_worker.tasks worker --loglevel=info --concurrency=1 -n worker_name_1

На клиенте (еще один докер-контейнер для меня).

from celery import Celery
from celery_worker import tasks

result = tasks.print_prout.apply_async(['prout'], queue='broadcast_tasks')

print result.get()

Следующий шаг для меня - как получить и отобразить результаты, возвращенные всеми работниками. "Print result.get()", кажется, возвращает только результат последнего работника. Это не кажется очевидным ( передают ли Celery результаты всех рабочих)

Согласно вашему описанию:

Я пытался выполнить трансляцию, но только один из моих работников получал ее за каждый звонок.

Вы можете использовать прямой обмен типа.

Попробуй это

   from celery import Celery
   from kombu.common import Broadcast

   BROKER_URL = 'amqp://guest:guest@localhost:5672//'


   class CeleryConf:
   # List of modules to import when celery starts.
   CELERY_ACCEPT_CONTENT = ['json']
   CELERY_IMPORTS = ('main.tasks')
   CELERY_QUEUES = (Broadcast('q1'),)
   CELERY_ROUTES = {
    'tasks.sampletask': {'queue': 'q1'}
   }


   celeryapp = Celery('celeryapp', broker=BROKER_URL)
   celeryapp.config_from_object(CeleryConf())


   @celeryapp.task
   def sampletask(form):
      print form

Чтобы отправить сообщение, сделайте

    d= sampletask.apply_async(['4c5b678350fc643'],serializer="json", queue='q1')
Другие вопросы по тегам