Задача трансляции сельдерея не работает
Я пытался выполнить трансляцию, но только один из моих работников получал ее за каждый звонок. Не могли бы вы помочь мне? (Я использую rabbitmq и node-celery)
default_exchange = Exchange('celery', type='direct')
celery.conf.update(
CELERY_RESULT_BACKEND = "amqp",
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES = (
Queue('celery', default_exchange, routing_key='celery'),
Broadcast('broadcast_tasks'),
),
CELERY_ROUTES = (
{'my_tasks.sample_broadcast_task': {
'queue': 'broadcast_tasks',
}},
{'my_tasks.sample_normal_task': {
'queue': 'celery',
'exchange': 'celery',
'exchange_type': 'direct',
'routing_key': 'celery',
}}
),
)
Я также протестировал следующую конфигурацию, но не работал.
celery.conf.update(
CELERY_RESULT_BACKEND = "amqp",
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('celery', Exchange('celery'), routing_key='celery'),
Broadcast('broadcast'),
),
)
@celery.task(ignore_result=True, queue='broadcast',
options=dict(queue='broadcast'))
def sample_broadcast_task():
print "test"
РЕДАКТИРОВАТЬ
после изменения способа запуска работника, добавив -Q трансляцию, теперь я сталкиваюсь с этой ошибкой:
PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'broadcast' in vhost '/': received 'direct' but current is 'fanout'
3 ответа
Попробовав много-много-много вещей, я наконец-то нашел решение. Это работа для меня. (сельдерей 3.1.24 (Cipater) и Python 2.7.12)
РАБОТНИК - tasks.py:
from celery import Celery
import celery_config
from kombu.common import Broadcast, Queue, Exchange
app = Celery()
app.config_from_object(sysadmin_celery_config)
@app.task
def print_prout(x):
print x
return x
РАБОЧИЙ - celery_config.py:
# coding=utf-8
from kombu.common import Broadcast, Queue, Exchange
BROKER_URL = 'amqp://login:pass@172.17.0.1//'
CELERY_RESULT_BACKEND = 'redis://:login@172.17.0.1'
CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ALWAYS_EAGER = False
CELERY_QUEUES = (Broadcast('broadcast_tasks'), )
Работник поссорился с:
celery -A celery_worker.tasks worker --loglevel=info --concurrency=1 -n worker_name_1
На клиенте (еще один докер-контейнер для меня).
from celery import Celery
from celery_worker import tasks
result = tasks.print_prout.apply_async(['prout'], queue='broadcast_tasks')
print result.get()
Следующий шаг для меня - как получить и отобразить результаты, возвращенные всеми работниками. "Print result.get()", кажется, возвращает только результат последнего работника. Это не кажется очевидным ( передают ли Celery результаты всех рабочих)
Согласно вашему описанию:
Я пытался выполнить трансляцию, но только один из моих работников получал ее за каждый звонок.
Вы можете использовать прямой обмен типа.
Попробуй это
from celery import Celery
from kombu.common import Broadcast
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
class CeleryConf:
# List of modules to import when celery starts.
CELERY_ACCEPT_CONTENT = ['json']
CELERY_IMPORTS = ('main.tasks')
CELERY_QUEUES = (Broadcast('q1'),)
CELERY_ROUTES = {
'tasks.sampletask': {'queue': 'q1'}
}
celeryapp = Celery('celeryapp', broker=BROKER_URL)
celeryapp.config_from_object(CeleryConf())
@celeryapp.task
def sampletask(form):
print form
Чтобы отправить сообщение, сделайте
d= sampletask.apply_async(['4c5b678350fc643'],serializer="json", queue='q1')