RabbitMQ: связывание с DLX

Я искал эту информацию (включая документы) и не могу ее найти.

Я использую последнюю версию php-amqplib с RabbitMQ v. 2.7.1. У меня три очереди и три обмена:

// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);

// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
    'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));

// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
    'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));

// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);

// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);

Поведение довольно простое: сообщения публикуются в EXCHANGE_TO_PROCESS, Внешний работник обрабатывает сообщение: если обработка идет A-OK, сообщение просто ACK'и, следовательно, удаляется из очереди (эта часть работает отлично); если обработка идет не так, вместо этого сообщение вставляется в EXCHANGE_WAITING где, после TTL 5 минут, он повторно вставляется через DLX в EXCHANGE_TO_PROCESS для повторной обработки. Однако после третьего сбоя он вставляется в EXCHANGE_TO_CLEAN куда будет приходить задание cron и очищать сообщения, регистрировать ошибки и т. д.

Однако проблема, с которой я столкнулся, заключается в том, что код четко связывает QUEUE_WAITING к EXCHANGE_WAITING (как и ожидалось), но когда я заглядываю на страницу управления RabbitMQ, я замечаю, что с этим обменом связаны две очереди, а именно QUEUE_TO_PROCESS а также QUEUE_WAITING, в этой последовательности. По истечении 5 минут сообщение исчезает. Я не совсем уверен, почему.

Все это приводит нас к моим вопросам: связывает ли обмен мертвыми буквами неявно привязку обмена параметром к очереди? И: что могло случиться с моими потерянными сообщениями?

РЕДАКТИРОВАТЬ

Я еще больше запутался, чем был. Я пробовал следующий, очень простой код:

    $this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
    $this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
    $this->channel->queue_declare('queueA', false, true, false, false, false, array(
        'x-dead-letter-exchange' => array('S', 'exchangeB'),
        'x-message-ttl' => array('I', 5000)
    ));
    $this->channel->queue_declare('queueB', false, true, false, false, false);
    $this->channel->queue_bind('queueA', 'exchangeA');
    $this->channel->queue_bind('queueB', 'exchangeB');

    $msg = new AMQPMessage('hello!');
    $this->channel->basic_publish($msg, 'exchangeA');

Это создает две очереди и два обмена (я видел их fanout чтобы избежать проблем с ключами маршрутизации), привязывает queueA к exchangeA и queueB к exchangeB, устанавливая TTL для queueA и его DLX для exchangeB. Наблюдая за тем, что происходит на странице управления, я вижу сообщение, которое проводит 5 секунд в очереди A, как и ожидалось, и затем сообщение исчезает, как в моем более сложном коде выше.

2 ответа

Решение

У меня возникли сомнения, что что-то не так, когда я наткнулся на этот блог и увидел плакат с футляром, очень похожим на наш, и он упомянул, что он работает без проблем... поэтому я начал копать немного больше.

У нас была проблема с версией. Мне сказали, что пакет RabbitMQ был обновлен, но мы используем Ubuntu 12.04 LTS, поэтому "обновленной" версией была 2.7.1 - версия, которой более 3 лет.

Если вы в том же деле, что и мы (используя более старый дистрибутив), зайдите на страницу загрузки RabbitMQ и выберите ту, которая подходит вашему дистрибутиву. В случае с Ubuntu мы просто добавили официальный репозиторий (вы также можете просто загрузить файл.dpkg), выполнив apt-get update и дождался перезагрузки сервера. После этого приведенный выше код работал в основном как есть.

Похоже, ваш поток сообщений может быть циклическим. Если это так, RabbitMQ автоматически отбросит сообщение, как указано в официальных документах (раздел " Маршрутизация сообщений с мертвыми буквами "):

Можно сформировать цикл очередей недоставленных сообщений. Например, это может произойти, когда очередь сообщений с ошибками отправляется на обмен по умолчанию без указания ключа маршрутизации недоставленных сообщений. Сообщения в таких циклах (то есть сообщения, которые попадают в одну и ту же очередь дважды) будут отбрасываться, если весь цикл вызван истечением срока действия сообщения.

Чтобы справиться с проблемой езды на велосипеде, вы должны выбрать один из вариантов:

  1. Разорвать цикл вообще (сбросить сообщение в какой-то момент).
  2. Или использовать сообщения из очереди с задержкой и повторно опубликовать их вручную в соответствии с вашим рабочим процессом.

Это вносит некоторую сложность в ваше приложение, но это цена, которую вы должны заплатить за производительность и стабильность.

PS:

Я копаю список рассылки RabbitMQ и нашел такой же вопрос, как ваш - Dead Letter, TTL и Cycle:

На данный момент вам придется потреблять и переиздавать, чтобы удалить заголовок за пределами RabbitMQ.

Другие вопросы по тегам