WSO2 MessageBroker: выдает ошибку org.wso2.andes.AMQTimeoutException при создании надежного подписчика

У меня есть код, который действует как мой подписчик. Я создал надежного подписчика. Так что из-за этого я получаю исключение как

Exception in thread "main" javax.jms.JMSException: Error registering consumer: org.wso2.andes.AMQTimeoutException: Server did not respond in a timely fashion [error code 408: Request Timeout]
    at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:2054)
    at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:1997)
    at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:305)
    at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:621)
    at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102)
    at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:1995)
    at org.wso2.andes.client.AMQSession.createConsumer(AMQSession.java:993)
    at org.wso2.andes.client.AMQSession.createDurableSubscriber(AMQSession.java:1142)
    at org.wso2.andes.client.AMQSession.createDurableSubscriber(AMQSession.java:1042)
    at org.wso2.andes.client.AMQTopicSessionAdaptor.createDurableSubscriber(AMQTopicSessionAdaptor.java:73)
    at xml.parser.Parser.subscribe(Parser.java:62)
    at xml.parser.Parser.main(Parser.java:34)

Но вместо того, чтобы быть устойчивым, когда я создаю нормального подписчика, мой код работает хорошо, и нет ошибки. Почему я получаю эту ошибку? И еще один вопрос - как отписаться от темы?

Мой код для подписчика:

package xml.parser;

import org.w3c.dom.*;
import javax.xml.xpath.*;
import javax.xml.namespace.NamespaceContext;
import javax.xml.parsers.*;

import java.io.IOException;
import org.xml.sax.SAXException;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class Parser {

    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_BROKER_PORT = "5673";
    String topicName = "myTopic";

    public static void main(String[] args) throws NamingException,
            JMSException, XPathExpressionException,
            ParserConfigurationException, SAXException, IOException {

        Parser queueReceiver = new Parser();
        String message = queueReceiver.subscribe();

        System.out.println("Got message from Queue ==> " + message);
    }

    public String subscribe() throws NamingException, JMSException {

        String messageContent = "";
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME,
                getTCPConnectionURL(userName, password));
        properties.put("topic." + topicName, topicName);
        System.out.println("getTCPConnectionURL(userName,password) = "
                + getTCPConnectionURL(userName, password));
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                .lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession = topicConnection.createTopicSession(false,
                QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        // Topic topic = topicSession.createTopic(topicName);
        Topic topic = (Topic) ctx.lookup(topicName);

        javax.jms.TopicSubscriber topicSubscriber = topicSession
                .createDurableSubscriber(topic,"topicQueue");
        Message message = topicSubscriber.receive();
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("textMessage.getText() = "
                    + textMessage.getText());
            messageContent = textMessage.getText();
        }
        topicSubscriber.close();
        topicSession.close();
        topicConnection.stop();
        topicConnection.close();


        return messageContent;
    }

    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer().append("amqp://").append(username)
                .append(":").append(password).append("@")
                .append(CARBON_CLIENT_ID).append("/")
                .append(CARBON_VIRTUAL_HOST_NAME).append("?brokerlist='tcp://")
                .append(CARBON_DEFAULT_HOSTNAME).append(":")
                .append(CARBON_BROKER_PORT).append("'").toString();

    }

}

1 ответ

Это проблема в дистрибутиве MB 2.0.1 с постоянными подписчиками. Причина этого заключается в том, что когда класс Parser запускается первый раз, получает сообщение и подписчик останавливается, затем, когда вы запускаете Parser во второй раз, он не запускает подписку обратно, так как предыдущая запись 'subscriber' все еще существует, и вы увидите следующее в терминале. Время ожидания клиента истечет после нескольких попыток, поэтому вы получаете журнал ошибок.

[2013-04-22 12:12:52,617]  INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} -  Closing channel due to: Cannot subscribe to queue carbon:topicQueue as it already has an existing exclusive consumer
[2013-04-22 12:12:52,621]  INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} -  Channel[1] awaiting closure - processing close-ok
[2013-04-22 12:12:52,621]  INFO {org.wso2.andes.server.handler.ChannelCloseOkHandler} -  Received channel-close-ok for channel-id 1

Эта проблема была исправлена ​​в выпуске MB 2.1.0, выход которого ожидается в ближайшие недели. Если вам нужно, пожалуйста, попробуйте ваш образец подписчика с MB 2.1.0 - альфа-версия отсюда. Это должно хорошо работать с этим пакетом.

О отписке от темы добавьте следующую строку в код вашего парсера и запустите снова, когда вам нужно отписаться.

topicSubscriber.close();
**topicSession.unsubscribe("topicQueue"); // add the name used to   identify the subscription in the place of "topicQueue"**
topicSession.close();
topicConnection.stop();
topicConnection.close();
Другие вопросы по тегам