RabbitMQ: Двунаправленные федеративные брокеры, как сделать так, чтобы все очереди выводили сообщение?
У меня есть два брокера, настроенные [1] с плагином федерации. Оба указывают друг на друга, как вверх по течению.
Мой тест:
- опубликовать сообщение на брокере А
- потреблять на брокера B
Результат:
- потребляет на брокер Б работает
очередь в брокере B выдает сообщение - <не хорошо> очередь в брокере A все еще содержит сообщение
- <причина, по которой это нехорошо> Проблема, которую я вижу: если я всегда публикую на одном брокере, а затем всегда использую на другом ->, то очередь на публикующем брокере будет расти до тех пор, пока не заполнится, и начнут отбрасывать сообщения.
Результат, который я хотел бы получить:
- обе очереди на брокере A и B выдают свои сообщения, когда потребитель потребляет на брокере B
Как настроить RabbitMQ для извлечения сообщения из всех очередей, когда потребитель использует сообщение в брокере B? Сейчас я пытаюсь сделать это с помощью плагина RabbitMQ Federation.
[1] Два брокера указывают друг на друга как восходящие потоки, и я настраиваю их так же, как описано в "простом примере", приведенном в документации, за исключением того, что есть два брокера, каждый из которых указывает друг на друга как восходящий. Код для издателя выглядит так, а код для потребителя - так.
1 ответ
@Trevor Boyd Smith, возможно, вы могли бы рассмотреть вариант 2 или 3, как показано ниже.
Вариант 1: двунаправленный федеративный обмен
Сообщение попадет в оба брокера A и B, по одной копии каждого брокера, независимо друг от друга. Другими словами, например, даже после того, как брокер B доставил сообщение своему потребителю, другая копия сообщения все еще остается в брокере A.
Преимущество: у вас всегда будет две копии сообщения, по одной на каждого брокера, что обеспечивает высокую доступность.
Недостаток: вам необходимо подключить потребителя к каждому брокеру.
Вариант 2: двунаправленная федеративная очередь
Сообщение попадет в один из двух брокеров. По умолчанию брокер, у которого было опубликовано сообщение, имеет приоритет при постановке сообщения в очередь; однако, если только у другого брокера есть потребитель, сообщение переместится к другому брокеру.
Не имеет значения, к какому брокеру попадает сообщение, сообщение будет доставлено один раз и только один раз потребителем, подключенным к любому из брокеров.
Преимущество: сообщение будет доставлено один раз и только один раз потребителю, подключенному к любому из брокеров.
Недостаток: будет только одна копия сообщения. Если брокер, получивший сообщение, выходит из строя, другой брокер не может получить сообщение. Но если вас устраивает конечная согласованность, этот вариант подойдет. Причина в том, что когда проблемный брокер возобновит работу, сообщение в конечном итоге станет доступным.
Вариант 3: двунаправленный федеративный обмен и очередь
В этом случае сообщение попадет в оба брокера, по одной копии на каждого брокера. Одно и то же сообщение будет доставлено потребителю, подключенному к любому из брокеров, дважды! После того, как сообщение было доставлено дважды, оно исчезнет в обоих брокерах. (Если есть два потребителя, по одному подключенному к каждому брокеру, каждый брокер доставит одно и то же сообщение своему потребителю один раз.)
Преимущество: потребитель может подключиться к любому из брокеров, и сообщение в каждом брокере будет доставлено и исключено из очереди.
Недостаток: одно и то же сообщение будет доставлено дважды. Чтобы решить эту проблему, перед обработкой сообщения проверьте, обработано ли уже такое же сообщение.
Примечание:
Это не значит, какой вариант лучше другого или других. Все зависит от вашего варианта использования, и есть много других конфигураций, которые могут вступить в игру, которые могут изменить поведение.
Я создал эту среду:
Сервер А, Сервер Б.
Создал двунаправленную федерацию следующим образом:
Федерация вверх по течению: Server_B = amqp://servera
Федерация Upstream: Server_A = amqp://serverb
Затем создали одинаковую политику на обоих серверах:
Pattern : ^fed\.
Apply to: all
federation-upstream-set:all
Создана одна очередь на сервере А с именем: fed.test1
затем создал потребителя на сервере B как:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5673);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Message '" + message );
}
};
channel.basicConsume("fed.test1", true, consumer);
Затем опубликовал сообщение на сервере А ---> fed.test1
Сообщение было передано на сервер B, а количество сообщений в очереди zero
в обе очереди (сервер A, сервер B).
Это работает, как вы ожидали.
Надеюсь, поможет.