Сельдерей + обратный обмен темами (x-rtopic) не работает

Я пытаюсь создать приложение для сельдерея, используя плагин обмена обратной темы rabbitmq от Alvaro Videla. Похоже, что работники хорошо связываются с брокером, используя этот обмен, но когда я задаю обратный маршрут своей задачи, не беру '#' или '*', это работает как прямой обмен.

так вот моя очередь:

Queue(name='cluster', 
          exchange = Exchange(name='cluster', 
                              type='x-rtopic',
                              delivery_mode='persistent',
                              durable=True), 
          routing_key='intel.%d.%s' % (n_cores, hostname),
          durable = True,)

Теперь представьте 2 рабочих, использующих следующую routing_key

  • Работник1: intel.8.host1
  • Работник2: amd.2.host2

Thats routing_keys для задач, которые я пытаюсь и что я испытал:

Routing key     | Works?   |  Result              | Expected
-------------------------------------------------------------------------
'intel'         | OK       | Nobody receives      | 
'intel.*'       | OK       | Nobody receives      |
'intel.#'       | WRONG    | Everyone receives    | just Worker1 receives
'#.host1'       | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.*      | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.host1  | WRONG    | Everyone receives    | just Worker1 receives
'*.2.*'         | WRONG    | Everyone receives    | just Worker2 receives
'intel.8.host1' | OK       | like direct exchange | 

Чтобы попытаться определить, в чем была проблема, я протестировал плагин, делающий простой обмен сообщениями с использованием pika и просто kombu, и оба работали нормально, точно так, как ожидалось. Поэтому я решил, что проблема в том, как Celery обменивается сообщениями. Может быть, я должен создать собственный класс маршрутизации!?

Заранее спасибо.

1 ответ

Через некоторое время я обнаружил, что плагин Reverse theme exchange работает с Celery. Я неверно истолковал способ работы очередей Rabbitmq. Чтобы полностью заставить его работать, мне нужно было определить Мой маршрутизатор, где задача направляется на обмен, содержащий эти очереди, и только указав ключ_рынки и имя обмена, таким образом, задачи все равно будут циклически перебирать узлы, подключенные к этому обмену, и уметь использовать подстановочные знаки в ключе маршрутизации задач.

Таким образом, настройки очереди будут выглядеть примерно так:

routed_queue = 'intel.8.pchost'

CELERY_QUEUES = (

    Queue(name='cluster.%s' % routed_queue,
          exchange = Exchange(name='cluster',
                              type='x-rtopic'), 
          routing_key=routed_queue),)

Маршрутизатор будет примерно таким:

Класс MyRouter(объект):

def route_for_task(self, task, args=[], kwargs={}):

    routing_key = kwargs['routing_key'] if kwargs.has_key('routing_key') and\
                  kwargs['routing_key'] else '#'

    return {'exchange': 'cluster',
            'exchange_type': 'rtopic',
            'routing_key': routing_key}

Затем я передавал Rouing_key в качестве kwargs для задачи, которую можно было установить в задаче "intel.#", То есть эта задача была бы выполнена любым работником с очередью, начинающейся с intel.

Единственная ошибка! вот почему я должен был выполнить задачи, используя.apply_async, а не.delay.

Вся идея состоит в том, чтобы иметь возможность направлять мои задачи в соответствии со спецификациями машин, доступными в кластере. Некоторые задачи должны выполняться только на процессорах Intel, а другие - только amd, или определяться по количеству ядер в узле или с использованием имен хостов.

Надеюсь, что это может помочь любому, кто попытается сделать то же самое в будущем.

Другие вопросы по тегам