Медленная MQ тема подписки. Распараллеливание не улучшает производительность
Я выполняю несколько групповых подписок (например, /A/# и /B/#). Каждая подписка (см. createSubscriber(topic)
ниже) дает около 1000 тем и занимает около 10 секунд, чтобы вернуться. Является ли 10 секунд разумным временем ответа? Это кажется медленным, но мне не с чем сравнивать.
Учитывая код ниже;
public class JMSClientSubscriber implements Runnable {
TopicConnection topicCon;
Properties properties;
List<MyListener> listeners;
JmsTopicConnectionFactory jcf;
boolean connected, alive;
public JMSClientSubscriber() throws JMSException {
properties = Properties.getInstance();
listeners = new LinkedList<>();
jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT);
connected = false;
alive = true;
}
@Override
public void run() {
try {
connect();
while (alive) {
Thread.sleep(1000);
}
disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
void connect() throws Exception {
connected = false;
topicCon = jcf.createTopicConnection();
topicCon.setExceptionListener(new ExceptionListener() {
@Override public void onException(JMSException arg0) {
disconnect();
try {
Thread.sleep(1000);
connect();
} catch (Exception e) {
e.printStackTrace();
}
}
});
topicCon.start();
for (MyListener listener: listeners) {
Thread t = new Thread() {
@Override public void run() {
TopicSession topicSes;
try {
topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription());
System.out.println(new Date() + " Subscribing to " + topic);
/* THIS TAKES 10 SECONDS! */ TopicSubscriber topicSub = topicSes.createSubscriber(topic);
System.out.println(new Date() + " Subscription finished " + topic);
topicSub.setMessageListener(listener);
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
connected = true;
}
void disconnect() {
try {
connected = false;
if (topicCon != null) topicCon.close();
} catch (JMSException e) {}
}
public void stop() { alive = false; }
public class MyListener implements MessageListener {
Class<? extends FlowMessage> expectedClass;
FlowMessage exampleMessage;
public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception {
this.expectedClass = expectedClass;
exampleMessage = expectedClass.newInstance();
listeners.add(this);
}
@Override
public void onMessage(javax.jms.Message arg0) {
BytesMessage bm = (BytesMessage) arg0;
try {
byte bytes[] = new byte[(int) bm.getBodyLength()];
bm.readBytes(bytes);
FlowMessage flowMessage = exampleMessage.newInstance(bytes);
System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties"));
LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties")));
/* Thread per connection */
for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) {
JMSClientSubscriber s = new JMSClientSubscriber();
s.new MyListener(clazz);
new Thread(s).start();
}
/* Thread per session */
JMSClientSubscriber s = new JMSClientSubscriber();
s.new MyListener(KondorCpty.class);
s.new MyListener(KondorPair.class);
new Thread(s).start();
}
}
main
в этом коде выполняется два теста;
Одно соединение + многопоточность / сеансы
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair
Многопоточное соединение + один сеанс на поток / соединение
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair
Оба теста одинаковы по времени и поведению.
- Подписка на ~1000 тем занимает ~ 10 секунд
- Подписки, кажется, работают последовательно, даже если они находятся в разных потоках.
- Обновления тем появляются только после завершения ВСЕХ подписок.
- Наличие TopicConnection.start() до или после подписки не влияет на производительность или на момент получения первого обновления темы.
Так как мне это ускорить?
2 ответа
Проблема была в onMessage
, Вместо того, чтобы иметь дело с сообщением здесь, я помещаю сообщение на BlockingQueue
, Затем несколько отдельных тем опрашивали BlockingQueue
, Это значительно улучшило пропускную способность MessageListener
и удалил проблему многопоточности из кода JMS/MQ.
Пожалуйста, обратите внимание на следующее:
1) Для каждого вызова метода createSession (очередь или тема) потоки данных от клиента и администратора очередей для настройки среды сеанса JMS. Вы используете удаленное соединение, что означает, что существует поток данных по сети.
2) вызов метода createSubscriber включает в себя создание объекта подписки, временную очередь помимо поиска по теме, проверки полномочий и т. Д. На стороне администратора очередей.
Можете ли вы показать нам, как вы распараллеливаете соединения / сеансы?
Согласно спецификациям JMS, сеанс не должен совместно использоваться потоками. Я бы посвятил одну ветку каждому подписчику, где
1) Создает соединение JMS
2) Создает сеанс JMS
3) Создает подписчика
4) Есть ли JMS Connection.start() для начала доставки сообщения.