Сельдерей + обратный обмен темами (x-rtopic) не работает
Я пытаюсь создать приложение для сельдерея, используя плагин обмена обратной темы rabbitmq от Alvaro Videla. Похоже, что работники хорошо связываются с брокером, используя этот обмен, но когда я задаю обратный маршрут своей задачи, не беру '#' или '*', это работает как прямой обмен.
так вот моя очередь:
Queue(name='cluster',
exchange = Exchange(name='cluster',
type='x-rtopic',
delivery_mode='persistent',
durable=True),
routing_key='intel.%d.%s' % (n_cores, hostname),
durable = True,)
Теперь представьте 2 рабочих, использующих следующую routing_key
- Работник1: intel.8.host1
- Работник2: amd.2.host2
Thats routing_keys для задач, которые я пытаюсь и что я испытал:
Routing key | Works? | Result | Expected
-------------------------------------------------------------------------
'intel' | OK | Nobody receives |
'intel.*' | OK | Nobody receives |
'intel.#' | WRONG | Everyone receives | just Worker1 receives
'#.host1' | WRONG | Everyone receives | just Worker1 receives
'intel.*.* | WRONG | Everyone receives | just Worker1 receives
'intel.*.host1 | WRONG | Everyone receives | just Worker1 receives
'*.2.*' | WRONG | Everyone receives | just Worker2 receives
'intel.8.host1' | OK | like direct exchange |
Чтобы попытаться определить, в чем была проблема, я протестировал плагин, делающий простой обмен сообщениями с использованием pika и просто kombu, и оба работали нормально, точно так, как ожидалось. Поэтому я решил, что проблема в том, как Celery обменивается сообщениями. Может быть, я должен создать собственный класс маршрутизации!?
Заранее спасибо.
1 ответ
Через некоторое время я обнаружил, что плагин Reverse theme exchange работает с Celery. Я неверно истолковал способ работы очередей Rabbitmq. Чтобы полностью заставить его работать, мне нужно было определить Мой маршрутизатор, где задача направляется на обмен, содержащий эти очереди, и только указав ключ_рынки и имя обмена, таким образом, задачи все равно будут циклически перебирать узлы, подключенные к этому обмену, и уметь использовать подстановочные знаки в ключе маршрутизации задач.
Таким образом, настройки очереди будут выглядеть примерно так:
routed_queue = 'intel.8.pchost'
CELERY_QUEUES = (
Queue(name='cluster.%s' % routed_queue,
exchange = Exchange(name='cluster',
type='x-rtopic'),
routing_key=routed_queue),)
Маршрутизатор будет примерно таким:
Класс MyRouter(объект):
def route_for_task(self, task, args=[], kwargs={}):
routing_key = kwargs['routing_key'] if kwargs.has_key('routing_key') and\
kwargs['routing_key'] else '#'
return {'exchange': 'cluster',
'exchange_type': 'rtopic',
'routing_key': routing_key}
Затем я передавал Rouing_key в качестве kwargs для задачи, которую можно было установить в задаче "intel.#", То есть эта задача была бы выполнена любым работником с очередью, начинающейся с intel.
Единственная ошибка! вот почему я должен был выполнить задачи, используя.apply_async, а не.delay.
Вся идея состоит в том, чтобы иметь возможность направлять мои задачи в соответствии со спецификациями машин, доступными в кластере. Некоторые задачи должны выполняться только на процессорах Intel, а другие - только amd, или определяться по количеству ядер в узле или с использованием имен хостов.
Надеюсь, что это может помочь любому, кто попытается сделать то же самое в будущем.