Сообщение JMS не откатывается после потери соединения во время подготовки 2PC

Я пытаюсь обернуть голову вокруг следующей проблемы:

TL;DR Как я могу убедиться, что происходит откат, когда соединение с диспетчером очередей потеряно во время подготовки 2PC.

  • ява: 8
  • весенняя загрузка: 2.3.12.RELEASE
  • весна-jms: 5.2.20.RELEASE
  • mq-jms-spring-boot-starter: 2.6.5 (все клиенты: 9.2.5.0)
  • narayana spring boot starter: 2.6.3 (narayana: 5.11.3.Final)
  • нюхать: 3.1.12

==========================================================

  1. Приложение читает постоянное сообщение (без срока действия) из очереди X в транзакции
  2. Я установил точку останова в XAResouceRecord.topLevelPrepare и с помощью Sniffy отключил соединение с диспетчером очередей во время подготовки к 2PC.
  3. Я жду больше, чем время HBINT (300) (или выключаю виртуальную машину, чтобы мгновенно отключить соединения TCP)
  4. Я ожидал, что сообщение будет доступно в очереди сообщений X, в очереди возврата или в очереди недоставленных сообщений.

Однако сообщение не откатывается. Я не вижу никаких журналов транзакций (думаю, этого и следовало ожидать, поскольку подготовка еще не завершена). Нет незафиксированных сообщений в очереди или очереди возврата.

Если я поставлю точку останова в AbstractPollingMessageListenerContainer.receiveAndExecute после получения сообщения, но до того, как транзакция будет зафиксирована, я увижу, что сообщение больше не находится в очереди. Так появляется, если session.commit уже произошел. Как я могу убедиться, что происходит откат, когда соединение с диспетчером очередей потеряно во время подготовки 2PC. Я, вероятно, что-то упускаю здесь, но я не могу понять, что.

1 ответ

После некоторого копания я считаю, что нашел проблему. Сообщение теперь откатывается, когда я разрываю соединение с диспетчером очередей во время подготовки 2PC. Надеюсь, это может помочь кому-то еще.

В своем вопросе я упомянул о том, что поставил точку останова в AbstractPollingMessageListenerContainer.receiveAndExecute . В весенней версии, которую я использовал 5.2.20.RELEASE, код выглядел так:

              if (this.transactionManager != null) {
            // Execute receive within transaction.
            TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
            boolean messageReceived;
            try {
                messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
            }
            catch (JMSException | RuntimeException | Error ex) {
                rollbackOnException(this.transactionManager, status, ex);
                throw ex;
            }
            this.transactionManager.commit(status);
            return messageReceived;
        }

Это выглядело немного странно, так как transactionManager.commit не был окружен попыткой перехвата. Так что же происходит, если фиксация не удалась?

Try-catch был добавлен в 5.3.16, см. https://github.com/spring-projects/spring-framework/pull/1807 .

              if (this.transactionManager != null) {
            // Execute receive within transaction.
            TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
            boolean messageReceived;
            try {
                messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
            }
            catch (JMSException | RuntimeException | Error ex) {
                rollbackOnException(this.transactionManager, status, ex);
                throw ex;
            }
            try {
                this.transactionManager.commit(status);
            }
            catch (TransactionException ex) {
                // Propagate transaction system exceptions as infrastructure problems.
                throw ex;
            }
            catch (RuntimeException ex) {
                // Typically a late persistence exception from a listener-used resource
                // -> handle it as listener exception, not as an infrastructure problem.
                // E.g. a database locking failure should not lead to listener shutdown.
                handleListenerException(ex);
            }
            return messageReceived;
        }
Другие вопросы по тегам