Медленная MQ тема подписки. Распараллеливание не улучшает производительность

Я выполняю несколько групповых подписок (например, /A/# и /B/#). Каждая подписка (см. createSubscriber(topic) ниже) дает около 1000 тем и занимает около 10 секунд, чтобы вернуться. Является ли 10 секунд разумным временем ответа? Это кажется медленным, но мне не с чем сравнивать.

Учитывая код ниже;

public class JMSClientSubscriber implements Runnable {

    TopicConnection           topicCon;
    Properties                properties;
    List<MyListener>          listeners;
    JmsTopicConnectionFactory jcf;
    boolean                   connected, alive;

    public JMSClientSubscriber() throws JMSException {
            properties = Properties.getInstance();
            listeners = new LinkedList<>();
            jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT);
            connected = false;
            alive = true;
    }

    @Override
    public void run() {
            try {
                    connect();
                    while (alive) {
                            Thread.sleep(1000);
                    }
                    disconnect();
            } catch (Exception e) {
                    e.printStackTrace();
            }
    }

    void connect() throws Exception {
            connected = false;
            topicCon = jcf.createTopicConnection();

            topicCon.setExceptionListener(new ExceptionListener() {
                    @Override public void onException(JMSException arg0) {
                            disconnect();
                            try {
                                    Thread.sleep(1000);
                                    connect();
                            } catch (Exception e) {
                                    e.printStackTrace();
                            }
                    }
            });

            topicCon.start();

            for (MyListener listener: listeners) { 
                    Thread t = new Thread() {
                            @Override public void run() {
                                    TopicSession topicSes;
                                    try {
                                            topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
                                            Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription());
                                            System.out.println(new Date() + " Subscribing to " + topic);
    /* THIS TAKES 10 SECONDS! */            TopicSubscriber topicSub = topicSes.createSubscriber(topic);
                                            System.out.println(new Date() + " Subscription finished " + topic);
                                            topicSub.setMessageListener(listener);
                                    } catch (Exception e) {
                                            e.printStackTrace();
                                    }
                            }
                    };
                    t.start();
            }
            connected = true;
    }

    void disconnect() {
            try {
                    connected = false;
                    if (topicCon != null) topicCon.close();
            } catch (JMSException e) {}    
    }

    public void stop() { alive = false; }

    public class MyListener implements MessageListener {           
            Class<? extends FlowMessage>       expectedClass;
            FlowMessage                        exampleMessage;

            public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception {
                    this.expectedClass = expectedClass;
                    exampleMessage = expectedClass.newInstance();
                    listeners.add(this);
            }

            @Override
            public void onMessage(javax.jms.Message arg0) {
                    BytesMessage bm = (BytesMessage) arg0;

                    try {
                            byte bytes[] = new byte[(int) bm.getBodyLength()];
                            bm.readBytes(bytes);
                            FlowMessage flowMessage = exampleMessage.newInstance(bytes);
                            System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString());

                    } catch (Exception e) {
                            e.printStackTrace();
                    }
            }
    }


    public static void main(String[] args) throws Exception {
            Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties"));
            LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties")));

            /* Thread per connection */
            for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) {
                    JMSClientSubscriber s = new JMSClientSubscriber();
                    s.new MyListener(clazz);
                    new Thread(s).start();
            }

            /* Thread per session */
            JMSClientSubscriber s = new JMSClientSubscriber();
            s.new MyListener(KondorCpty.class);
            s.new MyListener(KondorPair.class);
            new Thread(s).start();

    }

}

main в этом коде выполняется два теста;

Одно соединение + многопоточность / сеансы

Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair 

Многопоточное соединение + один сеанс на поток / соединение

Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair

Оба теста одинаковы по времени и поведению.

  • Подписка на ~1000 тем занимает ~ 10 секунд
  • Подписки, кажется, работают последовательно, даже если они находятся в разных потоках.
  • Обновления тем появляются только после завершения ВСЕХ подписок.
  • Наличие TopicConnection.start() до или после подписки не влияет на производительность или на момент получения первого обновления темы.

Так как мне это ускорить?

2 ответа

Проблема была в onMessage, Вместо того, чтобы иметь дело с сообщением здесь, я помещаю сообщение на BlockingQueue, Затем несколько отдельных тем опрашивали BlockingQueue, Это значительно улучшило пропускную способность MessageListener и удалил проблему многопоточности из кода JMS/MQ.

Пожалуйста, обратите внимание на следующее:

1) Для каждого вызова метода createSession (очередь или тема) потоки данных от клиента и администратора очередей для настройки среды сеанса JMS. Вы используете удаленное соединение, что означает, что существует поток данных по сети.

2) вызов метода createSubscriber включает в себя создание объекта подписки, временную очередь помимо поиска по теме, проверки полномочий и т. Д. На стороне администратора очередей.

Можете ли вы показать нам, как вы распараллеливаете соединения / сеансы?

Согласно спецификациям JMS, сеанс не должен совместно использоваться потоками. Я бы посвятил одну ветку каждому подписчику, где

1) Создает соединение JMS

2) Создает сеанс JMS

3) Создает подписчика

4) Есть ли JMS Connection.start() для начала доставки сообщения.

Другие вопросы по тегам