ActiveMQ - сообщения от DurableConsumer не получены
Я пытаюсь почерпнуть сообщения из темы ActiveMQ. В веб-консоли я вижу, что многочисленные сообщения поставлены в очередь в теме, но выполнение следующего кода ничего не возвращает:
IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
connection.Start();
ISession session = connection.CreateSession();
ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
}
Любая подсказка будет оценена.
1 ответ
Чтобы подписка Durable могла получать сообщения, отправленные в автономном режиме, она должна быть сначала зарегистрирована в брокере. Вы регистрируете его, создавая экземпляр, как вы сделали в указанном коде, а затем, как только он переходит в автономный режим, с помощью вызова close() и т. Д., Сообщения, отправленные в его тему, будут сохранены, чтобы он мог прочитать их позже. Если вы еще не зарегистрировали этого потребителя, то те сообщения, которые были отправлены в тему, просто удаляются.
Вам также необходим уникальный идентификатор клиента для подключения, чтобы при каждом повторном подключении вы могли повторно подписываться на долговременного потребителя тем.
Зарегистрируйте тему длительного пользования:
IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
connectio.ClientId = "MyClientId";
connection.Start();
ISession session = connection.CreateSession();
ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
}
Использовать сообщения позже:
IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
connection.ClientId = "MyClientId";
connection.Start();
ISession session = connection.CreateSession();
ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
}