Как выборочно удалять сообщения из очереди AMQP (RabbitMQ)?
Я хотел бы выборочно удалять сообщения из очереди AMQP, даже не читая их.
Сценарий таков:
Отправляющая сторона хочет истечь сообщения типа X на основе того факта, что поступила новая информация типа X. Поскольку весьма вероятно, что подписчик еще не принял последнее сообщение типа X, издатель должен просто удалить предыдущие сообщения типа X и поместить в очередь самое новое. Вся операция должна быть прозрачной для подписчика - на самом деле он должен использовать что-то простое, например STOMP, для получения сообщений.
Как это сделать с помощью AMQP? Или может быть удобнее в другом протоколе обмена сообщениями?
Я хотел бы избежать сложной инфраструктуры. Весь необходимый обмен сообщениями так же прост, как указано выше: одна очередь, один подписчик, один издатель, но издатель должен иметь возможность произвольно удалять сообщения по заданным критериям.
Клиент издателя будет использовать Ruby, но на самом деле я буду иметь дело с любым языком, как только я найду, как это сделать в протоколе.
4 ответа
В настоящее время вы не можете сделать это в RabbitMQ (или, в более общем смысле, в AMQP) автоматически. Но вот легкий обходной путь.
Допустим, вы хотите отправить три типа сообщений: Xs, Ys и Zs. Если я правильно понимаю ваш вопрос, когда приходит X-сообщение, вы хотите, чтобы брокер забыл все другие X-сообщения, которые не были доставлены.
Это довольно легко сделать в RabbitMQ:
- производитель объявляет три очереди: X, Y и Z (они автоматически привязываются к обмену по умолчанию со своими именами в качестве ключей маршрутизации, а это именно то, что нам нужно),
- при публикации сообщения производитель сначала очищает соответствующую очередь (поэтому, если он публикует сообщение X, он сначала очищает очередь X); это эффективно удаляет устаревшие сообщения,
- потребитель просто потребляет из очереди, которую он хочет (X для сообщений X, Y для сообщений Y и т. д.); с его точки зрения, он просто должен сделать basic.get, чтобы получить следующее соответствующее сообщение.
Это подразумевает состояние гонки, когда два производителя отправляют сообщения одного типа в одно и то же время. В результате очередь может иметь два (или более) сообщения одновременно, но количество сообщений ограничено числом производителей, а лишние сообщения удаляются при следующей публикации. это не должно быть большой проблемой.
Подводя итог, у этого решения есть только один дополнительный шаг от оптимального решения, а именно, очередь очистки X перед публикацией сообщения типа X.
Если вам нужна помощь в настройке этой конфигурации, идеальным местом для обращения за советом является список рассылки rabbitmq-обсуждения.
Вам не нужна очередь сообщений, вам нужна база данных значений ключей. Например, вы можете использовать Redis или Tokyo Tyrant, чтобы получить простую доступную по сети базу данных значений ключей. Или просто используйте memcache.
Каждый тип сообщения является ключом. Когда вы пишете новое сообщение с тем же ключом, оно перезаписывает предыдущее значение, поэтому читатель этой базы данных никогда не сможет получить устаревшую информацию.
На этом этапе вам нужна только очередь сообщений, чтобы установить порядок чтения ключей, если это важно. В противном случае просто постоянно сканируйте базу данных. Если вы постоянно сканируете базу данных, лучше всего расположить базу рядом со считывателями, чтобы уменьшить сетевой трафик.
Я бы наверное сделал что то подобноеkey: typecode
value: lastUpdated, important data
Тогда я бы отправил сообщения, которые содержатtypecode, lastUpdated
Таким образом, читатель может сравнить lastupdated для этого ключа с тем, который они последний раз читали из базы данных, и пропустить его чтение, потому что они уже обновлены.
Если вам действительно нужно сделать это с помощью AMQP, используйте RabbitMQ и пользовательский тип обмена, в частности, Exchange с кэшем последнего значения. Пример кода здесь https://github.com/squaremo/rabbitmq-lvc-plugin
Кажется, он работает и из веб-интерфейса RabbitMQ, если вы просто хотите удалить первые n сообщений из очереди
- выберите очередь на вкладке "Очереди", прокрутите вниз до раздела "Получить сообщения"
- установить параметр "Requeue=No" и количество сообщений, которые вы хотите удалить из очереди
- нажмите кнопку "Получить сообщения"
Этот вопрос имеет высокую наглядность благодаря названию. Проходя через описание, остановимся на более конкретном сценарии. Таким образом, для тех пользователей, которые хотят действительно удалить следующее (помните FIFO) сообщение из очереди, вы можете использовать rabbitmqadmin и выполнить следующую команду:
rabbitmqadmin get queue=queuename requeue=false count=1
Эта команда по существу потребляет сообщение и ничего не делает. Полная команда с флагом для резервного копирования сообщений может выглядеть так, как показано ниже. Убедитесь, что добавили любые другие параметры согласно вашему требованию.
sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg