ActiveMQ - сообщения от DurableConsumer не получены

Я пытаюсь почерпнуть сообщения из темы ActiveMQ. В веб-консоли я вижу, что многочисленные сообщения поставлены в очередь в теме, но выполнение следующего кода ничего не возвращает:

IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
     connection.Start();
     ISession session = connection.CreateSession();
     ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
     consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
     message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
}

Любая подсказка будет оценена.

1 ответ

Решение

Чтобы подписка Durable могла получать сообщения, отправленные в автономном режиме, она должна быть сначала зарегистрирована в брокере. Вы регистрируете его, создавая экземпляр, как вы сделали в указанном коде, а затем, как только он переходит в автономный режим, с помощью вызова close() и т. Д., Сообщения, отправленные в его тему, будут сохранены, чтобы он мог прочитать их позже. Если вы еще не зарегистрировали этого потребителя, то те сообщения, которые были отправлены в тему, просто удаляются.

Вам также необходим уникальный идентификатор клиента для подключения, чтобы при каждом повторном подключении вы могли повторно подписываться на долговременного потребителя тем.

Зарегистрируйте тему длительного пользования:

IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
    connectio.ClientId = "MyClientId";
    connection.Start();
    ISession session = connection.CreateSession();
    ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
    consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
}

Использовать сообщения позже:

IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
    connection.ClientId = "MyClientId";
    connection.Start();
    ISession session = connection.CreateSession();
    ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
    consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
    message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
}
Другие вопросы по тегам