Блокирует и получает пакетные сообщения с RabbitMq
Я пытаюсь использовать RabbitMq более нетрадиционным способом (хотя в этот момент я могу выбрать любую другую реализацию очереди сообщений, если это необходимо). Вместо того, чтобы отправлять push-сообщения Rabbit моим потребителям, потребитель подключается к очереди и получает пакет из N сообщений (в течение которых он потребляет некоторые, а возможно, отклоняет некоторые), после чего он переходит в другую очередь и так далее. Это сделано для избыточности. В случае сбоя некоторых потребителей все сообщения гарантированно будут использованы другим пользователем.
Проблема в том, что у меня несколько потребителей, и я не хочу, чтобы они конкурировали в одной и той же очереди. Есть ли способ гарантировать блокировку очереди? Если нет, могу ли я по крайней мере удостовериться, что если два потребителя подключены к одной и той же очереди, они не читают одно и то же сообщение? Транзакции могут в некоторой степени помочь мне, но я слышал, что они будут удалены из RabbitMQ.
Другие архитектурные предложения приветствуются.
Спасибо!
РЕДАКТИРОВАТЬ: Как указано в комментарии, есть особенность в том, как мне нужно обрабатывать сообщения. Они имеют смысл только в группах, и существует высокая вероятность того, что связанные сообщения объединяются в очередь. Например, если я получу пакет из 100 сообщений, существует высокая вероятность, что я смогу что-то сделать с сообщениями 1-3, 4-5,6-10 и т. Д. Если мне не удастся найти группу для некоторых сообщений, я Повторно отправлю их в очередь. WorkQueue не будет работать, потому что он будет рассылать сообщения из одной группы нескольким работникам, которые не будут знать, что с ними делать.
3 ответа
Вы уже взглянули на эту бесплатную онлайн-книгу по шаблонам корпоративной интеграции?
Похоже, вам действительно нужен рабочий процесс, в котором у вас есть компонент-дозатор, прежде чем сообщения попадут к вашим работникам. С RabbitMQ есть два способа сделать это. Либо используйте тип обмена (и формат сообщения), который может выполнить пакетирование для вас, либо иметь одну очередь, и рабочий, который сортирует пакеты и помещает каждый пакет в свою очередь. Дозатор, вероятно, также должен отправить сообщение "готово к отправке" в очередь управления, чтобы рабочий мог обнаружить существование новой очереди. После обработки пакета работник может удалить очередь пакета.
Если у вас есть контроль над форматом сообщения, вы можете получить RabbitMQ для неявного выполнения пакетирования несколькими способами. С обменом тем вы можете убедиться, что ключ маршрутизации в каждом сообщении имеет формат work.batchid.something, и тогда работник, который узнает о существовании пакетного xxyzz, будет использовать только ключ привязки, такой как #.xxyzz.#, Чтобы потреблять эти сообщения. Переиздание не требуется.
Другой способ - включить идентификатор заголовка в заголовок и использовать более новый тип обмена заголовками. Конечно, вы также можете реализовать свои собственные типы обмена, если вы хотите написать небольшое количество кода Erlang.
Однако я рекомендую проверить книгу, потому что она дает лучший обзор архитектуры обмена сообщениями, чем типичная концепция очереди для рабочих, с которой большинство людей начинают.
Пусть ваши потребители вытянуты из одной очереди. Они гарантированно не будут обмениваться сообщениями (Rabbit проведет циклический перебор сообщений среди подключенных в настоящее время потребителей), и он будет сильно оптимизирован для этого точного шаблона использования.
Он готов к использованию, из коробки. В документах RabbitMQ это называется моделью Work Queue. Одна очередь, несколько потребителей, и никто из них не делится чем-либо. Похоже, что вам нужно.
Вы можете установить количество предварительных выборок на уровне канала / потребителя, чтобы получать сообщения в пакетном режиме. Для повторной отправки сообщений следует использовать метод AMQP basic.reject, и эти сообщения можно выбрать для добавления в очередь или для пересылки в очередь недоставленных сообщений. Несколько потребителей, пытающихся извлечь сообщения из одной и той же очереди, не являются проблемой, так как метод AMQP basic.get будет синхронизирован для обработки одновременных потребителей.