Неподтвержденное сообщение в ActiveMQ с настройкой при сбое не доставляется подписчику
Я реализую Active MQ издателя и подписчика в C#. Я использую клиентскую библиотеку Apache.NMS.ActiveMQ .net для связи с брокером.
<package id="Apache.NMS" version="1.7.1" targetFramework="net461" />
<package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" />
ActiveMQ настроен на настройку при сбое на 4 серверах (.225, .226, .346, .347 - последние части IP для справки) . URL брокера выглядит примерно так
отказоустойчивый:// ТСР://103.24.34.225:61616, ТСР://103.24.34.226:61616, ТСР://103.24.34.346:61616, ТСР://103.24.34.347:61616
Вот как я публикую
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
var connectionFactory = new ConnectionFactory(brokerUrl);
using (var connection = connectionFactory.CreateConnection("conn1", "conn$34"))
{
connection.ClientId = "TESTPUBLISHER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.Start();
var session = connection.CreateSession();
var topic = new ActiveMQTopic("ACCOUNT.UPDATE");
var producer = session.CreateProducer(topic);
var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST";
var textMessage = producer.CreateTextMessage(msg);
textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE");
textMessage.Properties.SetString("Action", "UPDATE");
textMessage.Properties.SetString("DataContractType", "Account");
producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0));
}
Вот как я подписываюсь на тему. Этот код настроен так, что несколько общих подписчиков могут прослушивать входящие сообщения. Мне сказали, что для этого нужно использовать виртуальные темы. Поэтому я настроил своего подписчика на использование виртуальной темы, и он размещен внутри проекта службы Windows. Я использую режим подтверждения для ClientAcknowledge, поэтому, если сообщение не подтверждено, оно должно продолжать возвращаться. Ниже приведен фрагмент кода, представляющий только важную абонентскую часть службы Windows.
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl));
IConnection connection = factory.CreateConnection("conn1", "conn$34"))
connection.ClientId = "TESTSUBSCRIBER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.ConnectionInterruptedListener += OnConnectionInturrupted;
connection.ExceptionListener += OnConnectionExceptionListener;
connection.ConnectionResumedListener += OnConnectionResumedListener;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE");
ActiveMQTopic topic = new ActiveMQTopic();
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += OnMessage;
private void OnMessage(IMessage message)
{
var payload = ((ITextMessage)message).Text;
Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]");
if(payload != "43342_test")
{
message.Acknowledge();
Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]");
}
}
private void OnConnectionResumedListener()
{
Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER");
}
private void OnConnectionExceptionListener(Exception exception)
{
Log.Error(exception);
}
private void OnConnectionInturrupted()
{
Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER");
}
Я умею публиковать и подписные сообщения. У меня проблема с одним конкретным случаем. Допустим, подписчик установил соединение с сервером-посредником (.225) из пула отказоустойчивых серверов. Издатель опубликовал сообщение. Абонент получил его, и он находится в процессе обработки. Но из-за некоторого обслуживания патчей сервера служба Windows была закрыта. В результате подключение абонента к брокеру было разорвано. Когда служба Windows восстановилась, на этот раз подписчик установил соединение с другим сервером-посредником (сервером-посредником.346) из пула отработки отказа. Когда это произошло, неподтвержденное сообщение не было доставлено повторно. Но если я перезапущу службу Windows и, если повезет, установил соединение с брокером.225 (тем же сервером, к которому изначально был подключен подписчик), теперь подписчик получает неподтвержденное сообщение.
Я предполагаю, что когда ActiveMQ настроен на настройку при сбое, независимо от того, к какому серверу-посреднику из пула отработки отказа подписчик способен установить соединение, он всегда должен получать неподтвержденные сообщения.
В некоторых ситуациях сбой настройки кажется работоспособным. Предположим, что подписчик был подключен к серверу брокера.346 из пула отработки отказа. Издатель подключается к другому серверу брокера (.225 брокера) из того же пула и публикует сообщение, подписчик получает сообщения. Это доказывает, что Fail over setup работает.
Но как только подписчик получает сообщение от сервера-посредника и, если подписчик отключен до подтверждения сообщения, он должен восстановить соединение с тем же сервером-посредником, чтобы получить неподтвержденное сообщение. Это не звучит правильно для меня.
Требуется ли дополнительная настройка в настройке сервера Active MQ, чтобы этот вариант использования работал?
1 ответ
Решение этой проблемы было не на стороне клиента, а с помощью конфигурации сервера Active MQ.
Для политики назначения управления потоком источника добавьте ConditionalNetworkBridgeFilter и включите replayWhenNoConsumers.