Почему потребитель не создан для очереди ActiveMQ Temp?
Кроме SimpleMessageListenerContainer
опция, потребитель не создан для временной очереди. Я не буду использовать SimpleMessageListenerContainer
по некоторым вопросам, с которыми здесь сталкиваются .
Следующий код не работает...(даже временная очередь не создается)
using (IConnection connection = connectionFactory.CreateConnection())
using (ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, aQueueName);
var replyDestination = session.CreateTemporaryQueue();
// Create a consumer and producer
using (IMessageProducer producer = session.CreateProducer(destination))
{
// Start the connection so that messages will be processed.
connection.Start();
IBytesMessage request = session.CreateBytesMessage(aMsg);
request.NMSReplyTo = replyDestination;
IMessageConsumer consumer = session.CreateConsumer(replyDestination);
consumer.Listener += new MessageListener(this.OnAckRecieved);
// Send a message
producer.Send(request);
ack = this.autoEvent.WaitOne(this.msgConsumeTimeOut, true);
consumer.Close();
consumer.Dispose();
ConnectionFactoryUtils.GetTargetSession(session).DeleteDestination(replyDestination);
}
connection.Close();
session.Close();
Работает следующий код:- но очередь кажется постоянной, а не временной
using (IConnection connection = connectionFactory.CreateConnection())
using (ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, aQueueName);
var replyDestination = session.CreateTemporaryQueue();
// Create a consumer and producer
using (IMessageProducer producer = session.CreateProducer(destination))
{
// Start the connection so that messages will be processed.
connection.Start();
IBytesMessage request = session.CreateBytesMessage(aMsg);
request.NMSReplyTo = replyDestination;
IDestination tempDestination = this.destinationResolver.ResolveDestinationName(session, request.NMSReplyTo.ToString());
IMessageConsumer consumer = session.CreateConsumer(tempDestination);
consumer.Listener += new MessageListener(this.OnAckRecieved);
// Send a message
producer.Send(request);
ack = this.autoEvent.WaitOne(this.msgConsumeTimeOut, true);
consumer.Close();
consumer.Dispose();
ConnectionFactoryUtils.GetTargetSession(session).DeleteDestination(tempDestination);
}
connection.Close();
session.Close();
С помощью приведенного выше кода (с использованием NmsDestinationAccessor) он работает. Но он создает постоянную очередь. Поэтому, когда я использую адрес назначения ответа временной очереди напрямую, он не работает.
3 ответа
- Вместо того, чтобы использовать C#, напишите код на Java, так как это лучший набор для ActiveMQ. Читайте здесь для примеров использования временной очереди в Java.
- А затем скомпилируйте его в файл JAR, и вы сможете импортировать его в свой код C# через IKVM.NET, как описано здесь
- Надеюсь, это сработает.
Примечание: вы должны знать, что вы не можете использовать временную очередь в другом сеансе.
Создание ActiveMQTempQueue
объект прямо из NMSReplyTo.ToString
Метод, вероятно, вызывает у вас проблемы здесь, как ToString
метод не гарантирует возвращение значения, из которого может быть создан соответствующий пункт назначения. Так как вы не знаете, указывал ли отправитель временное назначение или обычное, его плохое кодирование также. Правильнее всего просто создать нового потребителя, используя метод создания потребителя сеанса, используя NSMReplyTo
пункт назначения как есть.
Вот простой тестовый пример ответа на запрос из проекта NMS, который работает с Apache.NMS.Stomp и Apache.NMS.ActiveMQ.
namespace Apache.NMS.Test
{
[TestFixture]
public class RequestResponseTest : NMSTestSupport
{
protected static string DESTINATION_NAME = "RequestDestination";
[Test]
[Category("RequestResponse")]
public void TestRequestResponseMessaging()
{
using(IConnection connection = CreateConnection())
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination = SessionUtil.GetDestination(session, DESTINATION_NAME);
ITemporaryQueue replyTo = session.CreateTemporaryQueue();
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
IMessage request = session.CreateMessage();
request.NMSReplyTo = replyTo;
producer.Send(request);
request = consumer.Receive(TimeSpan.FromMilliseconds(3000));
Assert.IsNotNull(request);
Assert.IsNotNull(request.NMSReplyTo);
using(IMessageProducer responder = session.CreateProducer(request.NMSReplyTo))
{
IMessage response = session.CreateTextMessage("RESPONSE");
responder.Send(response);
}
}
using(IMessageConsumer consumer = session.CreateConsumer(replyTo))
{
ITextMessage response = consumer.Receive(TimeSpan.FromMilliseconds(3000)) as ITextMessage;
Assert.IsNotNull(response);
Assert.AreEqual("RESPONSE", response.Text);
}
}
}
}
}
Временная очередь существует только тогда, когда существует созданное ею соединение. В вашем примере кода вы создаете его перед началом соединения, поэтому я думаю, что он просто молча выдает ошибку, потому что нет активного соединения.