Сообщение JMS не откатывается после потери соединения во время подготовки 2PC
Я пытаюсь обернуть голову вокруг следующей проблемы:
TL;DR Как я могу убедиться, что происходит откат, когда соединение с диспетчером очередей потеряно во время подготовки 2PC.
- ява: 8
- весенняя загрузка: 2.3.12.RELEASE
- весна-jms: 5.2.20.RELEASE
- mq-jms-spring-boot-starter: 2.6.5 (все клиенты: 9.2.5.0)
- narayana spring boot starter: 2.6.3 (narayana: 5.11.3.Final)
- нюхать: 3.1.12
==========================================================
- Приложение читает постоянное сообщение (без срока действия) из очереди X в транзакции
- Я установил точку останова в XAResouceRecord.topLevelPrepare и с помощью Sniffy отключил соединение с диспетчером очередей во время подготовки к 2PC.
- Я жду больше, чем время HBINT (300) (или выключаю виртуальную машину, чтобы мгновенно отключить соединения TCP)
- Я ожидал, что сообщение будет доступно в очереди сообщений X, в очереди возврата или в очереди недоставленных сообщений.
Однако сообщение не откатывается. Я не вижу никаких журналов транзакций (думаю, этого и следовало ожидать, поскольку подготовка еще не завершена). Нет незафиксированных сообщений в очереди или очереди возврата.
Если я поставлю точку останова в AbstractPollingMessageListenerContainer.receiveAndExecute после получения сообщения, но до того, как транзакция будет зафиксирована, я увижу, что сообщение больше не находится в очереди. Так появляется, если session.commit уже произошел. Как я могу убедиться, что происходит откат, когда соединение с диспетчером очередей потеряно во время подготовки 2PC. Я, вероятно, что-то упускаю здесь, но я не могу понять, что.
1 ответ
После некоторого копания я считаю, что нашел проблему. Сообщение теперь откатывается, когда я разрываю соединение с диспетчером очередей во время подготовки 2PC. Надеюсь, это может помочь кому-то еще.
В своем вопросе я упомянул о том, что поставил точку останова в AbstractPollingMessageListenerContainer.receiveAndExecute . В весенней версии, которую я использовал 5.2.20.RELEASE, код выглядел так:
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
throw ex;
}
this.transactionManager.commit(status);
return messageReceived;
}
Это выглядело немного странно, так как transactionManager.commit не был окружен попыткой перехвата. Так что же происходит, если фиксация не удалась?
Try-catch был добавлен в 5.3.16, см. https://github.com/spring-projects/spring-framework/pull/1807 .
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
throw ex;
}
try {
this.transactionManager.commit(status);
}
catch (TransactionException ex) {
// Propagate transaction system exceptions as infrastructure problems.
throw ex;
}
catch (RuntimeException ex) {
// Typically a late persistence exception from a listener-used resource
// -> handle it as listener exception, not as an infrastructure problem.
// E.g. a database locking failure should not lead to listener shutdown.
handleListenerException(ex);
}
return messageReceived;
}