Неподтвержденное сообщение в ActiveMQ с настройкой при сбое не доставляется подписчику

Я реализую Active MQ издателя и подписчика в C#. Я использую клиентскую библиотеку Apache.NMS.ActiveMQ .net для связи с брокером.

  <package id="Apache.NMS" version="1.7.1" targetFramework="net461" />
  <package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" />

ActiveMQ настроен на настройку при сбое на 4 серверах (.225, .226, .346, .347 - последние части IP для справки) . URL брокера выглядит примерно так

отказоустойчивый:// ТСР://103.24.34.225:61616, ТСР://103.24.34.226:61616, ТСР://103.24.34.346:61616, ТСР://103.24.34.347:61616

Вот как я публикую

    var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
    var connectionFactory = new ConnectionFactory(brokerUrl);
    using (var connection = connectionFactory.CreateConnection("conn1", "conn$34"))
    {
        connection.ClientId = "TESTPUBLISHER";
        connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
        connection.Start();

        var session = connection.CreateSession();
        var topic = new ActiveMQTopic("ACCOUNT.UPDATE");
        var producer = session.CreateProducer(topic);


        var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST";
        var textMessage = producer.CreateTextMessage(msg);

        textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE");
        textMessage.Properties.SetString("Action", "UPDATE");
        textMessage.Properties.SetString("DataContractType", "Account");

        producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0));

     }

Вот как я подписываюсь на тему. Этот код настроен так, что несколько общих подписчиков могут прослушивать входящие сообщения. Мне сказали, что для этого нужно использовать виртуальные темы. Поэтому я настроил своего подписчика на использование виртуальной темы, и он размещен внутри проекта службы Windows. Я использую режим подтверждения для ClientAcknowledge, поэтому, если сообщение не подтверждено, оно должно продолжать возвращаться. Ниже приведен фрагмент кода, представляющий только важную абонентскую часть службы Windows.

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl));
IConnection connection = factory.CreateConnection("conn1", "conn$34"))

    connection.ClientId = "TESTSUBSCRIBER";
    connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
    connection.ConnectionInterruptedListener += OnConnectionInturrupted;
    connection.ExceptionListener += OnConnectionExceptionListener;
    connection.ConnectionResumedListener += OnConnectionResumedListener;
    connection.Start();

    ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
    var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE");
    ActiveMQTopic topic = new ActiveMQTopic();
    IMessageConsumer consumer = session.CreateConsumer(queue); 

    consumer.Listener += OnMessage;



private void OnMessage(IMessage message)
{
    var payload = ((ITextMessage)message).Text;
    Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]");
    if(payload != "43342_test")
    {
        message.Acknowledge(); 
            Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]");  
    }
}

private void OnConnectionResumedListener()
{
    Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER");
}

private void OnConnectionExceptionListener(Exception exception)
{
    Log.Error(exception);
}

private void OnConnectionInturrupted()
{
    Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER");
}

Я умею публиковать и подписные сообщения. У меня проблема с одним конкретным случаем. Допустим, подписчик установил соединение с сервером-посредником (.225) из пула отказоустойчивых серверов. Издатель опубликовал сообщение. Абонент получил его, и он находится в процессе обработки. Но из-за некоторого обслуживания патчей сервера служба Windows была закрыта. В результате подключение абонента к брокеру было разорвано. Когда служба Windows восстановилась, на этот раз подписчик установил соединение с другим сервером-посредником (сервером-посредником.346) из пула отработки отказа. Когда это произошло, неподтвержденное сообщение не было доставлено повторно. Но если я перезапущу службу Windows и, если повезет, установил соединение с брокером.225 (тем же сервером, к которому изначально был подключен подписчик), теперь подписчик получает неподтвержденное сообщение.

Я предполагаю, что когда ActiveMQ настроен на настройку при сбое, независимо от того, к какому серверу-посреднику из пула отработки отказа подписчик способен установить соединение, он всегда должен получать неподтвержденные сообщения.

В некоторых ситуациях сбой настройки кажется работоспособным. Предположим, что подписчик был подключен к серверу брокера.346 из пула отработки отказа. Издатель подключается к другому серверу брокера (.225 брокера) из того же пула и публикует сообщение, подписчик получает сообщения. Это доказывает, что Fail over setup работает.

Но как только подписчик получает сообщение от сервера-посредника и, если подписчик отключен до подтверждения сообщения, он должен восстановить соединение с тем же сервером-посредником, чтобы получить неподтвержденное сообщение. Это не звучит правильно для меня.

Требуется ли дополнительная настройка в настройке сервера Active MQ, чтобы этот вариант использования работал?

1 ответ

Решение

Решение этой проблемы было не на стороне клиента, а с помощью конфигурации сервера Active MQ.

Для политики назначения управления потоком источника добавьте ConditionalNetworkBridgeFilter и включите replayWhenNoConsumers.

Другие вопросы по тегам