Обработка транзакций JMS и повторная доставка с многопоточными слушателями

Я использую JMS для обработки сообщений в среде Java 1.8 SE. Сообщения исходят из расширенной очереди Oracle. Поскольку обработка сообщения может занять некоторое время, я решил создать пул из 5 рабочих потоков (объектов MessageHandler), чтобы сообщения могли обрабатывать несколько потоков одновременно. Я хотел бы иметь гарантированную доставку без доставки повторяющихся сообщений.

я использую

queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);

создать QueueSession. Я использую код ниже для обработки входящих сообщений. В принципе, onMessage порождает поток, который обрабатывает сообщение.

public class JmsQueueListener implements MessageListener
{
    /** A pool of worker threads for handling requests. */
    private final ExecutorService pool;

    OracleJmsQueue queue;

    public void onMessage(Message msg)
    {
        pool.execute(new MessageHandler(msg));
        // can't commit here - the thread may still be processing
    }

    /**
     * This class provides a "worker thread" for processing a message
     * from the queue.
     */
    private class MessageHandler implements Runnable {

        /**
         * The message to process
         */
        Message message;

        /**
         * The constructor stores the passed in message as a field
         */
        MessageHandler(Message message) {
            this.message = message;
        }

        /**
         * Processes the message provided to the constructor by
         * calling the appropriate business logic.
         */
        public void run() {
            QueueSession queueSession = queue.getQueueSession();
            try {
                String result = requestManager.processMessage(message);

                if (result != null) {
                    queueSession.commit();
                }
                else {
                    queueSession.rollback();
                }
            }
            catch (Exception ex) {
                try {
                    queueSession.rollback();
                }
                catch (JMSException e) {
                }
            }
        }
    }   //  class MessageHandler

Моя проблема в том, что я не знаю, как указать исходной очереди, успешно ли завершена обработка. Я не могу совершить в конце onMessageпотому что поток, возможно, не завершил обработку. Я не думаю, что там, где я в настоящее время commitс и rollbackS тоже хорошо. Например, если 5 рабочих потоков находятся в различных состояниях завершения, в каком состоянии сеанс очереди фиксируется?

Я думаю, что мне не хватает какой-то фундаментальной концепции о том, как обрабатывать JMS в многопоточном режиме. Любая помощь приветствуется.

1 ответ

Решение

Вы используете асинхронную обработку сообщений, поэтому, если вы не реализуете способ, гарантирующий, что каждая обработка сообщений будет выполняться в хронологическом порядке, вы закончите сценарий, когда более старая обработка сообщений заканчивается после обработки недавних сообщений. Так зачем использовать сервис обмена сообщениями?

Простое решение вашей проблемы commit в конце onMessage метод, и в теле вашего messageHandler повторно поставить сообщение в очередь в случае ошибки. Тем не менее, это решение может иметь проблему, если переопределение само по себе не удается.

Другие вопросы по тегам