RabbitMQ: Двунаправленные федеративные брокеры, как сделать так, чтобы все очереди выводили сообщение?

У меня есть два брокера, настроенные [1] с плагином федерации. Оба указывают друг на друга, как вверх по течению.

Мой тест:

  • опубликовать сообщение на брокере А
  • потреблять на брокера B

Результат:

  • потребляет на брокер Б работает
  • очередь в брокере B выдает сообщение
  • <не хорошо> очередь в брокере A все еще содержит сообщение
    • <причина, по которой это нехорошо> Проблема, которую я вижу: если я всегда публикую на одном брокере, а затем всегда использую на другом ->, то очередь на публикующем брокере будет расти до тех пор, пока не заполнится, и начнут отбрасывать сообщения.

Результат, который я хотел бы получить:

  • обе очереди на брокере A и B выдают свои сообщения, когда потребитель потребляет на брокере B

Как настроить RabbitMQ для извлечения сообщения из всех очередей, когда потребитель использует сообщение в брокере B? Сейчас я пытаюсь сделать это с помощью плагина RabbitMQ Federation.


[1] Два брокера указывают друг на друга как восходящие потоки, и я настраиваю их так же, как описано в "простом примере", приведенном в документации, за исключением того, что есть два брокера, каждый из которых указывает друг на друга как восходящий. Код для издателя выглядит так, а код для потребителя - так.

1 ответ

@Trevor Boyd Smith, возможно, вы могли бы рассмотреть вариант 2 или 3, как показано ниже.

Вариант 1: двунаправленный федеративный обмен

Сообщение попадет в оба брокера A и B, по одной копии каждого брокера, независимо друг от друга. Другими словами, например, даже после того, как брокер B доставил сообщение своему потребителю, другая копия сообщения все еще остается в брокере A.

Преимущество: у вас всегда будет две копии сообщения, по одной на каждого брокера, что обеспечивает высокую доступность.

Недостаток: вам необходимо подключить потребителя к каждому брокеру.

Вариант 2: двунаправленная федеративная очередь

Сообщение попадет в один из двух брокеров. По умолчанию брокер, у которого было опубликовано сообщение, имеет приоритет при постановке сообщения в очередь; однако, если только у другого брокера есть потребитель, сообщение переместится к другому брокеру.

Не имеет значения, к какому брокеру попадает сообщение, сообщение будет доставлено один раз и только один раз потребителем, подключенным к любому из брокеров.

Преимущество: сообщение будет доставлено один раз и только один раз потребителю, подключенному к любому из брокеров.

Недостаток: будет только одна копия сообщения. Если брокер, получивший сообщение, выходит из строя, другой брокер не может получить сообщение. Но если вас устраивает конечная согласованность, этот вариант подойдет. Причина в том, что когда проблемный брокер возобновит работу, сообщение в конечном итоге станет доступным.

Вариант 3: двунаправленный федеративный обмен и очередь

В этом случае сообщение попадет в оба брокера, по одной копии на каждого брокера. Одно и то же сообщение будет доставлено потребителю, подключенному к любому из брокеров, дважды! После того, как сообщение было доставлено дважды, оно исчезнет в обоих брокерах. (Если есть два потребителя, по одному подключенному к каждому брокеру, каждый брокер доставит одно и то же сообщение своему потребителю один раз.)

Преимущество: потребитель может подключиться к любому из брокеров, и сообщение в каждом брокере будет доставлено и исключено из очереди.

Недостаток: одно и то же сообщение будет доставлено дважды. Чтобы решить эту проблему, перед обработкой сообщения проверьте, обработано ли уже такое же сообщение.

Примечание:

Это не значит, какой вариант лучше другого или других. Все зависит от вашего варианта использования, и есть много других конфигураций, которые могут вступить в игру, которые могут изменить поведение.

Я создал эту среду:

Сервер А, Сервер Б.

Создал двунаправленную федерацию следующим образом:

Федерация вверх по течению: Server_B = amqp://servera

Федерация Upstream: Server_A = amqp://serverb

Затем создали одинаковую политику на обоих серверах:

Pattern : ^fed\.
Apply to: all   
federation-upstream-set:all

Создана одна очередь на сервере А с именем: fed.test1 затем создал потребителя на сервере B как:

 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 factory.setPort(5673);
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();


Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Message   '" + message );
                }
            };
channel.basicConsume("fed.test1",  true, consumer);

Затем опубликовал сообщение на сервере А ---> fed.test1

Сообщение было передано на сервер B, а количество сообщений в очереди zero в обе очереди (сервер A, сервер B).

Это работает, как вы ожидали.

Надеюсь, поможет.

Другие вопросы по тегам