Как RabbitMQ решает, когда пора удалять сообщение?
Я пытаюсь понять логику удаления сообщений в RabbitMQ.
Моя цель - сохранить сообщения, даже если к ним не подключен клиент для их чтения, чтобы при повторном подключении клиентов их ждали сообщения. Я могу использовать длительные, ленивые очереди, чтобы сообщения сохранялись на диске, и я могу использовать репликацию HA, чтобы гарантировать, что несколько узлов получат копию всех сообщений в очереди.
Я хочу, чтобы сообщения помещались в две или более очередей с использованием темы или заголовка и чтобы один или несколько клиентов читали каждую очередь.
У меня есть две очереди, A и B, питаемые обменом заголовков. Очередь А получает все сообщения. Очередь B получает только сообщения с заголовком "архив". В очереди A читают 3 потребителя. В очереди B есть 1 потребитель. Если потребитель B умирает, но потребители A продолжают подтверждать сообщения, будет ли RabbitMQ удалять сообщения или продолжать их хранить? В очереди B не будет никого, потребляющего ее, пока B не будет перезапущен, и я хочу, чтобы сообщения оставались доступными для последующего использования.
Я прочитал кучу документации до сих пор, но до сих пор не нашел четкого ответа на это.
3 ответа
Ответ прост: сообщения из одной очереди не влияют на сообщения в другой. Как только вы публикуете сообщение, брокер рассылает копии в столько очередей, сколько необходимо - но они являются истинными копиями сообщения и абсолютно не связаны с этой точки зрения в том, что касается брокера.
Сообщения, помещенные в длительную очередь, остаются до тех пор, пока они не будут извлечены потребителем из очереди и при необходимости не будут подтверждены.
Обратите внимание, что существуют определенные параметры TTL уровня очереди и уровня сообщения, которые могут повлиять на это. Например, если очередь имеет TTL, а потребитель не восстанавливает соединение до истечения срока ее действия, очередь испарится вместе со всеми своими сообщениями. Аналогичным образом, если сообщение было поставлено в очередь с определенным TTL (который также может быть установлен по умолчанию для всех сообщений в определенной очереди), то после прохождения этого TTL сообщение не будет доставлено потребителю.
Вторичное примечание В случае истечения срока действия сообщения в очереди из-за TTL оно фактически останется в очереди до следующей доставки.
RabbitMQ решит, когда удалять сообщения после подтверждения.
Допустим, у вас есть отправитель сообщения:
var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Это создаст длительную очередь "привет" и отправит сообщение "Hello World!" к этому. Вот так будет выглядеть очередь после отправки ей одного сообщения.
Теперь давайте настроим двух потребителей, один из которых будет подтверждать, что сообщение было получено, а другой - нет.
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
а также
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
Если вы запускаете только первого потребителя, сообщение никогда не будет удалено из очереди, поскольку потребитель заявляет, что сообщения исчезнут из очереди только в том случае, если клиент подтвердит их вручную: https://www.rabbitmq.com/confirms.html
Второй потребитель, однако, сообщит очереди, что он может безопасно / автоматически удалить все полученные сообщения.
Если вы не хотите автоматически удалять эти сообщения, вы должны отключить autoAck и сделать некоторое подтверждение вручную, используя документацию:
http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file (прокрутите вниз до "Подтверждение вручную").
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Существуют разные способы удаления сообщений с помощью RabbitMQ. Некоторые из них:
- После Ack от потребителя
- Время жизни (TTL) для этой очереди достигнуто.
- Время жизни (TTL) для сообщений в этой очереди достигнуто.
Последние два пункта указывают, что RabbitMQ позволяет вам устанавливать TTL(время жизни) как для сообщений, так и для очередей. TTL можно установить для данной очереди, установив аргумент x-message-ttl в queue.declare или установив политику message-ttl. Время истечения может быть установлено для данной очереди, задав для аргумента x- expires значение queue.declare или установив политику истечения срока действия.
Сообщение, которое находилось в очереди дольше, чем настроенный TTL, считается мертвым. Здесь важно отметить, что одно сообщение, направленное в разные очереди, может умереть в разное время, а иногда и никогда в каждой очереди, в которой оно находится. Смерть сообщения в одной очереди не влияет на срок жизни того же сообщения в другой очереди