Обработка транзакций JMS и повторная доставка с многопоточными слушателями
Я использую JMS для обработки сообщений в среде Java 1.8 SE. Сообщения исходят из расширенной очереди Oracle. Поскольку обработка сообщения может занять некоторое время, я решил создать пул из 5 рабочих потоков (объектов MessageHandler), чтобы сообщения могли обрабатывать несколько потоков одновременно. Я хотел бы иметь гарантированную доставку без доставки повторяющихся сообщений.
я использую
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
создать QueueSession. Я использую код ниже для обработки входящих сообщений. В принципе, onMessage
порождает поток, который обрабатывает сообщение.
public class JmsQueueListener implements MessageListener
{
/** A pool of worker threads for handling requests. */
private final ExecutorService pool;
OracleJmsQueue queue;
public void onMessage(Message msg)
{
pool.execute(new MessageHandler(msg));
// can't commit here - the thread may still be processing
}
/**
* This class provides a "worker thread" for processing a message
* from the queue.
*/
private class MessageHandler implements Runnable {
/**
* The message to process
*/
Message message;
/**
* The constructor stores the passed in message as a field
*/
MessageHandler(Message message) {
this.message = message;
}
/**
* Processes the message provided to the constructor by
* calling the appropriate business logic.
*/
public void run() {
QueueSession queueSession = queue.getQueueSession();
try {
String result = requestManager.processMessage(message);
if (result != null) {
queueSession.commit();
}
else {
queueSession.rollback();
}
}
catch (Exception ex) {
try {
queueSession.rollback();
}
catch (JMSException e) {
}
}
}
} // class MessageHandler
Моя проблема в том, что я не знаю, как указать исходной очереди, успешно ли завершена обработка. Я не могу совершить в конце onMessage
потому что поток, возможно, не завершил обработку. Я не думаю, что там, где я в настоящее время commit
с и rollback
S тоже хорошо. Например, если 5 рабочих потоков находятся в различных состояниях завершения, в каком состоянии сеанс очереди фиксируется?
Я думаю, что мне не хватает какой-то фундаментальной концепции о том, как обрабатывать JMS в многопоточном режиме. Любая помощь приветствуется.
1 ответ
Вы используете асинхронную обработку сообщений, поэтому, если вы не реализуете способ, гарантирующий, что каждая обработка сообщений будет выполняться в хронологическом порядке, вы закончите сценарий, когда более старая обработка сообщений заканчивается после обработки недавних сообщений. Так зачем использовать сервис обмена сообщениями?
Простое решение вашей проблемы commit
в конце onMessage
метод, и в теле вашего messageHandler
повторно поставить сообщение в очередь в случае ошибки. Тем не менее, это решение может иметь проблему, если переопределение само по себе не удается.