Как RabbitMQ решает, когда пора удалять сообщение?

Я пытаюсь понять логику удаления сообщений в RabbitMQ.

Моя цель - сохранить сообщения, даже если к ним не подключен клиент для их чтения, чтобы при повторном подключении клиентов их ждали сообщения. Я могу использовать длительные, ленивые очереди, чтобы сообщения сохранялись на диске, и я могу использовать репликацию HA, чтобы гарантировать, что несколько узлов получат копию всех сообщений в очереди.

Я хочу, чтобы сообщения помещались в две или более очередей с использованием темы или заголовка и чтобы один или несколько клиентов читали каждую очередь.

У меня есть две очереди, A и B, питаемые обменом заголовков. Очередь А получает все сообщения. Очередь B получает только сообщения с заголовком "архив". В очереди A читают 3 потребителя. В очереди B есть 1 потребитель. Если потребитель B умирает, но потребители A продолжают подтверждать сообщения, будет ли RabbitMQ удалять сообщения или продолжать их хранить? В очереди B не будет никого, потребляющего ее, пока B не будет перезапущен, и я хочу, чтобы сообщения оставались доступными для последующего использования.

Я прочитал кучу документации до сих пор, но до сих пор не нашел четкого ответа на это.

3 ответа

Решение

Ответ прост: сообщения из одной очереди не влияют на сообщения в другой. Как только вы публикуете сообщение, брокер рассылает копии в столько очередей, сколько необходимо - но они являются истинными копиями сообщения и абсолютно не связаны с этой точки зрения в том, что касается брокера.

Сообщения, помещенные в длительную очередь, остаются до тех пор, пока они не будут извлечены потребителем из очереди и при необходимости не будут подтверждены.

Обратите внимание, что существуют определенные параметры TTL уровня очереди и уровня сообщения, которые могут повлиять на это. Например, если очередь имеет TTL, а потребитель не восстанавливает соединение до истечения срока ее действия, очередь испарится вместе со всеми своими сообщениями. Аналогичным образом, если сообщение было поставлено в очередь с определенным TTL (который также может быть установлен по умолчанию для всех сообщений в определенной очереди), то после прохождения этого TTL сообщение не будет доставлено потребителю.

Вторичное примечание В случае истечения срока действия сообщения в очереди из-за TTL оно фактически останется в очереди до следующей доставки.

RabbitMQ решит, когда удалять сообщения после подтверждения.

Допустим, у вас есть отправитель сообщения:

var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    string message = "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "",
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);
    Console.WriteLine(" [x] Sent {0}", message);
}

Это создаст длительную очередь "привет" и отправит сообщение "Hello World!" к этому. Вот так будет выглядеть очередь после отправки ей одного сообщения.

Теперь давайте настроим двух потребителей, один из которых будет подтверждать, что сообщение было получено, а другой - нет.

channel.BasicConsume(queue: "hello",
                    autoAck: false,
                    consumer: consumer);

а также

channel.BasicConsume(queue: "hello",
                    autoAck: true,
                    consumer: consumer);

Если вы запускаете только первого потребителя, сообщение никогда не будет удалено из очереди, поскольку потребитель заявляет, что сообщения исчезнут из очереди только в том случае, если клиент подтвердит их вручную: https://www.rabbitmq.com/confirms.html

Второй потребитель, однако, сообщит очереди, что он может безопасно / автоматически удалить все полученные сообщения.

Если вы не хотите автоматически удалять эти сообщения, вы должны отключить autoAck и сделать некоторое подтверждение вручную, используя документацию:

http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file (прокрутите вниз до "Подтверждение вручную").

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

Существуют разные способы удаления сообщений с помощью RabbitMQ. Некоторые из них:

  • После Ack от потребителя
  • Время жизни (TTL) для этой очереди достигнуто.
  • Время жизни (TTL) для сообщений в этой очереди достигнуто.

Последние два пункта указывают, что RabbitMQ позволяет вам устанавливать TTL(время жизни) как для сообщений, так и для очередей. TTL можно установить для данной очереди, установив аргумент x-message-ttl в queue.declare или установив политику message-ttl. Время истечения может быть установлено для данной очереди, задав для аргумента x- expires значение queue.declare или установив политику истечения срока действия.

Сообщение, которое находилось в очереди дольше, чем настроенный TTL, считается мертвым. Здесь важно отметить, что одно сообщение, направленное в разные очереди, может умереть в разное время, а иногда и никогда в каждой очереди, в которой оно находится. Смерть сообщения в одной очереди не влияет на срок жизни того же сообщения в другой очереди

Другие вопросы по тегам