How can I Send/Receive a message from Azure Service Bus from Qpid JMS (qpid-jms-client-0.11.1.jar)?
I am currently researching how to connect to Azure Service Bus using Qpid JMS (qpid-jms-client-0.11.1.jar).
I have created a Demo Java application SimpleSenderReceiver which connects to an already configured Azure Service Bus using the following guide ( #link1). This code seems to work using a "very" old version om the Qpid JMS client (version 0.32). I am now trying to get it to work with the latest stable version of Qpid JMS (qpid-jms-client-0.11.1.jar), And so far I have not been successful. Going through the documentation #link2 of Qpid JMS 0.11.1, you can see that the way that the in the properties file the property connectionfactory is different to that in version 0.32.
- How can i setup a correct connection amqp connection string in the properties file?
- How can I setup de Qpid JMS - Azure Service Bus Demo to work with the latest Qpid stable version?
I keep running in the following problem:
731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. TrackingId:238849ced1em4cd3a093261372f4fc1e_G21, SystemTracker:gateway6, Timestamp:10/27/2016 8:16:23 AM [condition = amqp:internal-error]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:150)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:147)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:251)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:771)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I have the follwing properties file servicebus.properties:
# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSsuiLI%3D&transport.connectTimeout=6000
# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.myQueueLookup = queue1
I have the flowing class SimpleSenderReceiver.java:
package com.demo.AzureTest;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;
public class SimpleSenderReceiver implements MessageListener {
private static boolean runReceiver = false;
private Connection connection;
private Session sendSession;
private Session receiveSession;
private MessageProducer sender;
private MessageConsumer receiver;
private static Random randomGenerator = new Random();
public SimpleSenderReceiver() throws Exception {
// Configure JNDI environment
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put(Context.PROVIDER_URL, "C://PATH//servicebus.properties");
Context context = new InitialContext(env);
// Look up ConnectionFactory and Queue
ConnectionFactory cf = (ConnectionFactory) context.lookup("myFactoryLookup");
System.out.println("lookup: " + context.lookup("myFactoryLookup"));
System.out.println("cf:"+cf);
Destination queue = (Destination) context.lookup("myQueueLookup");
System.out.println("queue:");
// Create Connection
connection = cf.createConnection();
System.out.println("connection :"+connection);
// // Create sender-side Session and MessageProducer
sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("Session open.");
sender = sendSession.createProducer(queue);
System.out.println(sender.getDestination());
System.out.println("sender:"+sender);
if (runReceiver) {
// Create receiver-side Session, MessageConsumer,and MessageListener
receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
receiver = receiveSession.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
}
}
public static void main(String[] args) {
try {
if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
runReceiver = false;
}
SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
System.out.println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
simpleSenderReceiver.close();
System.exit(0);
} else {
simpleSenderReceiver.sendMessage();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void sendMessage() throws JMSException {
TextMessage message = sendSession.createTextMessage();
message.setText("Hello from SIS Test AMQP message from Java JMSaaa");
long randomMessageID = randomGenerator.nextLong() >>>1;
message.setStringProperty("TenantId", "klant");
message.setStringProperty("EventType", "bericht");
message.setStringProperty("EventTypeVersion", "1.0");
message.setStringProperty("MessageType", "DocumentMessage");
message.setStringProperty("OperationType", "Create");
message.setStringProperty("SourceSystem", "sis_sender");
message.setStringProperty("EnterpriseKey", "sis_sender-klant-bericht");
message.setJMSMessageID("ID:" + randomMessageID);
sender.send(message);
System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());
System.out.println("Sent message with Text = " + message.getText());
}
public void close() throws JMSException {
connection.close();
}
public void onMessage(Message message) {
try {
System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
TextMessage txtmessage = (TextMessage) message;
System.out.println("Received message with Text = " + txtmessage.getText());
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Maven dependencies:
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
--- Update ---
I have since gotten a little further but still a bit stuck. Update to the connectionfactory property:
connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?amqp.idleTimeout=150000&jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSkuiLI%3D
I now am getting the following stacktrace:
842 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
1014 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:543efe98-3ecc-485e-9f7f-3046c40db0cb:1 connected to remote Broker: amqps://example-open-bus-bus.servicebus.windows.net
1301 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] WARN org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder - Open of resource:(JmsProducerInfo { ID:546efe78-3ecc-485d-9f6f-3065c40db1ce:1:1:1, destination = klant }) failed: Attempted to perform an unauthorized operation. TrackingId:2950b1ed7a0d4e0a97b0k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
Caught exception, exiting.
javax.jms.JMSSecurityException: Attempted to perform an unauthorized operation. TrackingId:2890b0ed9a0d4e0a97b1k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:129)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2 ответа
Более новый клиент по умолчанию включает сердцебиение / время простоя AMQP, в то время как более старый клиент этого не делал. Клиент устанавливает тайм-аут по умолчанию на 60 секунд, и, в свою очередь, это означает, что он запрашивает значение времени ожидания 30 секунд (30000 мс) в своем открытом кадре AMQP при подключении к серверу в соответствии с определенным поведением спецификаций (где узлы объявляют половину своего фактического времени ожидания чтобы избежать ложных таймаутов).
ServiceBus отклоняет значение открытого кадра 30000 мс и указывает, что ему нужно значение не менее 60000 мс (или, предположительно, также 0, что означает, что оно отключено). Для этого вам нужно настроить клиент так, чтобы его тайм-аут был установлен как минимум на 120000 мс, что приведет к требуемому минимуму 60000 мсек. Значение времени простоя открытого кадра является обязательным для ServiceBus (или снова, возможно, отключите обработку тайм-аута клиента, установив его до 0).
Это можно сделать с помощью параметра URI "amqp.idleTimeout" в соответствии с http://qpid.apache.org/releases/qpid-jms-0.11.1/docs/index.html
РЕДАКТИРОВАТЬ: я вижу, вы поняли это в то же время, когда я печатал свой ответ.
Новое исключение от ServiceBus говорит, что вы не уполномочены делать то, что вы пытаетесь. Должно быть достаточно легко поймать исключение в его источнике и определить, что.
Ваш URI выглядит нормально (хотя я предполагаю, что ваше имя пользователя на самом деле не "somePolicy", а двойное connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = в начале является ошибкой c & p). Я не использовал клиента с ServiceBus лично, но я видел вопросы от разных людей, которые имеют, так что я не знаю ни одной конкретной проблемы, мешающей им работать вместе.
Я столкнулся с той же проблемой безопасности, о которой говорилось выше, и провел некоторое время, отслеживая ее, поэтому для всех остальных моя проблема была вызвана значением ключа, указанным в user.password
параметр запроса, содержащий +
персонаж.
Обычно есть =
в конце значения, которое я закодировал как %3D
в строке, и я закодировал +
как %2B
однако если вы установите точку останова в точке, где создается экземпляр ConnectionFactory, и посмотрите на атрибут пароля, вы увидите, что =
правильно не закодирован, но +
был удален и является пробелом, следовательно, несанкционированного доступа.
Мой обходной путь - просто восстановить первичный ключ в Azure, чтобы он не имел +
в нем (юк) но это сработало. Возможно, ошибка в библиотеках AQPID.