RabbitMQ: связывание с DLX
Я искал эту информацию (включая документы) и не могу ее найти.
Я использую последнюю версию php-amqplib с RabbitMQ v. 2.7.1. У меня три очереди и три обмена:
// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);
// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));
// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));
// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);
// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);
Поведение довольно простое: сообщения публикуются в EXCHANGE_TO_PROCESS
, Внешний работник обрабатывает сообщение: если обработка идет A-OK, сообщение просто ACK'и, следовательно, удаляется из очереди (эта часть работает отлично); если обработка идет не так, вместо этого сообщение вставляется в EXCHANGE_WAITING
где, после TTL 5 минут, он повторно вставляется через DLX в EXCHANGE_TO_PROCESS
для повторной обработки. Однако после третьего сбоя он вставляется в EXCHANGE_TO_CLEAN
куда будет приходить задание cron и очищать сообщения, регистрировать ошибки и т. д.
Однако проблема, с которой я столкнулся, заключается в том, что код четко связывает QUEUE_WAITING
к EXCHANGE_WAITING
(как и ожидалось), но когда я заглядываю на страницу управления RabbitMQ, я замечаю, что с этим обменом связаны две очереди, а именно QUEUE_TO_PROCESS
а также QUEUE_WAITING
, в этой последовательности. По истечении 5 минут сообщение исчезает. Я не совсем уверен, почему.
Все это приводит нас к моим вопросам: связывает ли обмен мертвыми буквами неявно привязку обмена параметром к очереди? И: что могло случиться с моими потерянными сообщениями?
РЕДАКТИРОВАТЬ
Я еще больше запутался, чем был. Я пробовал следующий, очень простой код:
$this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
$this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
$this->channel->queue_declare('queueA', false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', 'exchangeB'),
'x-message-ttl' => array('I', 5000)
));
$this->channel->queue_declare('queueB', false, true, false, false, false);
$this->channel->queue_bind('queueA', 'exchangeA');
$this->channel->queue_bind('queueB', 'exchangeB');
$msg = new AMQPMessage('hello!');
$this->channel->basic_publish($msg, 'exchangeA');
Это создает две очереди и два обмена (я видел их fanout
чтобы избежать проблем с ключами маршрутизации), привязывает queueA к exchangeA и queueB к exchangeB, устанавливая TTL для queueA и его DLX для exchangeB. Наблюдая за тем, что происходит на странице управления, я вижу сообщение, которое проводит 5 секунд в очереди A, как и ожидалось, и затем сообщение исчезает, как в моем более сложном коде выше.
2 ответа
У меня возникли сомнения, что что-то не так, когда я наткнулся на этот блог и увидел плакат с футляром, очень похожим на наш, и он упомянул, что он работает без проблем... поэтому я начал копать немного больше.
У нас была проблема с версией. Мне сказали, что пакет RabbitMQ был обновлен, но мы используем Ubuntu 12.04 LTS, поэтому "обновленной" версией была 2.7.1 - версия, которой более 3 лет.
Если вы в том же деле, что и мы (используя более старый дистрибутив), зайдите на страницу загрузки RabbitMQ и выберите ту, которая подходит вашему дистрибутиву. В случае с Ubuntu мы просто добавили официальный репозиторий (вы также можете просто загрузить файл.dpkg), выполнив apt-get update
и дождался перезагрузки сервера. После этого приведенный выше код работал в основном как есть.
Похоже, ваш поток сообщений может быть циклическим. Если это так, RabbitMQ автоматически отбросит сообщение, как указано в официальных документах (раздел " Маршрутизация сообщений с мертвыми буквами "):
Можно сформировать цикл очередей недоставленных сообщений. Например, это может произойти, когда очередь сообщений с ошибками отправляется на обмен по умолчанию без указания ключа маршрутизации недоставленных сообщений. Сообщения в таких циклах (то есть сообщения, которые попадают в одну и ту же очередь дважды) будут отбрасываться, если весь цикл вызван истечением срока действия сообщения.
Чтобы справиться с проблемой езды на велосипеде, вы должны выбрать один из вариантов:
- Разорвать цикл вообще (сбросить сообщение в какой-то момент).
- Или использовать сообщения из очереди с задержкой и повторно опубликовать их вручную в соответствии с вашим рабочим процессом.
Это вносит некоторую сложность в ваше приложение, но это цена, которую вы должны заплатить за производительность и стабильность.
PS:
Я копаю список рассылки RabbitMQ и нашел такой же вопрос, как ваш - Dead Letter, TTL и Cycle:
На данный момент вам придется потреблять и переиздавать, чтобы удалить заголовок за пределами RabbitMQ.