Очередь сообщений с гарантиями заказа после отправки

У меня есть случай использования, когда мне нужно использовать механизм очередей, который гарантирует, что сообщения обрабатываются потребителем ("работниками") по порядку и одно за другим.

Я использовал RabbitMQ в прошлом, и он гарантирует порядок, в котором он получил сообщение. Но что, если этот порядок неверен?

Допустим, я отправляю сообщения [4, 5, 3, 2, 1], потребители RabbitMQ будут обрабатывать сообщения в таком порядке. Что если я хочу, чтобы они обрабатывались в порядке [1, 2, 3, 4, 5], потому что сообщения зависят друг от друга?

Кроме того, я не хочу позволить потребителю потреблять сообщение 3 до того, как сообщение 2 будет подтверждено (без пробелов).

Есть ли какие-либо решения для очередей, которые поддерживают этот вариант использования? В настоящее время мы сбрасываем сообщения в базу данных и работники периодически извлекают данные в порядке.

1 ответ

Рассмотрим один из двух шаблонов: "Последовательность сообщений" или "Повторный секвенсор".

Последовательность сообщений полезна следующим образом:

Всякий раз, когда большой набор данных может потребоваться разбить на куски размера сообщения, отправьте данные как последовательность сообщений и пометьте каждое сообщение полями идентификации последовательности.

Resequencer немного отличается:

Используйте фильтр с сохранением состояния, Resequencer, для сбора и изменения порядка сообщений, чтобы они могли быть опубликованы в выходном канале в указанном порядке.

Resequencer может получить поток сообщений, которые могут не поступить по порядку. Resequencer содержит во внутреннем буфере для хранения сообщений вне последовательности, пока не будет получена полная последовательность. Последовательные сообщения затем публикуются в выходной канал. Важно, чтобы выходной канал сохранял порядок, поэтому сообщения гарантированно поступят в следующем порядке. Как и большинство других маршрутизаторов, Resequencer обычно не изменяет содержимое сообщения.

Я бы настоятельно рекомендовал прочитать эти паттерны в книге Грегора Хоупа / Бобби Вульфа "Паттерны интеграции предприятий" (с участием Мартина Фаулера и других).

Здесь есть больше деталей, чем я могу описать, но, по сути, последовательность сообщений зависит от наличия идентификатора последовательности, позиции и "конца" (логическое поле). Вам понадобится адаптер в конце очереди для обработки последовательности.

Секвенсор, с другой стороны, "сохраняет внепоследовательные сообщения во внутреннем буфере до получения полной последовательности, а затем публикует сообщения в выходной канал в правильной последовательности" (стр. 285).

Это очень похоже на вашу стратегию "дамп в БД, а затем иметь рабочую тягу", которую вы сейчас используете.

Детали реализации этих шаблонов будут основаны на языке вашего приложения и выборе очереди (в вашем случае RabbitMQ), но шаблоны уже достаточно хорошо разработаны, поэтому я бы внимательно их рассмотрел.

Я не знаю ни одного встроенного механизма в самом RabbitMQ, который может помочь вам получить это.

Надеюсь это поможет.

редактировать

Я погуглил "RabbitMQ resequencer" и обнаружил следующее (хотя не могу ручаться за эффективность): rabbus-sequence (GitHub).

В любом случае, это может помочь взглянуть на то, что делает их код, чтобы увидеть, может ли это стать источником вдохновения.

Другие вопросы по тегам