Как смоделировать повторную доставку сообщения в сценарии сеанса JMS AUTO_ACKNOWLEDGE?

В следующем тесте я пытаюсь смоделировать следующий сценарий:

  1. Очередь сообщений запущена.
  2. Запущен потребитель, предназначенный для сбоя во время обработки сообщения.
  3. Сообщение производится.
  4. Потребитель начинает обрабатывать сообщение.
  5. Во время обработки выдается исключение для имитации ошибки обработки сообщения. Отказавший потребитель остановлен.
  6. Другой потребитель начинает с намерения забрать доставленное сообщение.

Но мой тест не пройден, и сообщение не доставляется новому потребителю. Я буду признателен за любые намеки на это.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

TaskListener.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}

1 ответ

Решение

Видимо, источник документации, который я искал вчера Создание надежных приложений JMS, вводит меня в заблуждение (или я мог неправильно это понять). Особенно этот отрывок:

Пока сообщение JMS не будет подтверждено, оно не считается успешно использованным. Успешное использование сообщения обычно происходит в три этапа.

  1. Клиент получает сообщение.
  2. Клиент обрабатывает сообщение.
  3. Сообщение подтверждено. Подтверждение инициируется поставщиком JMS или клиентом, в зависимости от режима подтверждения сеанса.

Я предположил, что AUTO_ACKNOWLEDGE делает именно это - подтверждает сообщение после того, как метод слушателя возвращает результат. Но в соответствии со спецификацией JMS это немного отличается, и контейнеры прослушивателя Spring, как и ожидалось, не пытаются изменить поведение из спецификации JMS. Это то, что должен сказать Javadoc AbstractMessageListenerContainer - я подчеркнул важные предложения:

Контейнер слушателя предлагает следующие опции подтверждения сообщения:

  • "sessionAcknowledgeMode" установлен в "AUTO_ACKNOWLEDGE" (по умолчанию): автоматическое подтверждение сообщения перед выполнением слушателя; Нет возврата в случае исключения.
  • "sessionAcknowledgeMode" установлен в "CLIENT_ACKNOWLEDGE": автоматическое подтверждение сообщения после успешного выполнения слушателя; Нет возврата в случае исключения.
  • "sessionAcknowledgeMode" установлен в "DUPS_OK_ACKNOWLEDGE": отложенное подтверждение сообщения во время или после выполнения слушателя; потенциальная повторная доставка в случае исключения.
  • "sessionTransacted" установлен в "true": подтверждение транзакции после успешного выполнения слушателя; гарантированная повторная доставка в случае исключения.

Так что ключ к моему решению listenerContainer.setSessionTransacted(true);

Еще одна проблема, с которой я столкнулся, заключалась в том, что провайдер JMS продолжает возвращать сообщение с ошибкой обратно тому же потребителю, который потерпел неудачу при обработке сообщения. Я не знаю, дает ли спецификация JMS рецепт того, что должен делать провайдер в таких ситуациях, но что мне помогло, так это использовать listenerContainer.shutdown(); чтобы отключить отказавшего потребителя и позволить провайдеру повторно доставить сообщение и дать шанс другому потребителю.

Другие вопросы по тегам