Как смоделировать повторную доставку сообщения в сценарии сеанса JMS AUTO_ACKNOWLEDGE?
В следующем тесте я пытаюсь смоделировать следующий сценарий:
- Очередь сообщений запущена.
- Запущен потребитель, предназначенный для сбоя во время обработки сообщения.
- Сообщение производится.
- Потребитель начинает обрабатывать сообщение.
- Во время обработки выдается исключение для имитации ошибки обработки сообщения. Отказавший потребитель остановлен.
- Другой потребитель начинает с намерения забрать доставленное сообщение.
Но мой тест не пройден, и сообщение не доставляется новому потребителю. Я буду признателен за любые намеки на это.
MessageProcessingFailureAndReprocessingTest.java
@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests {
@Autowired
private FailureReprocessTestScenario testScenario;
@Before
public void setUp() {
testScenario.start();
}
@After
public void tearDown() throws Exception {
testScenario.stop();
}
@Test public void
should_reprocess_task_after_processing_failure() {
try {
Thread.sleep(20*1000);
assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
"task-1",
})));
} catch (InterruptedException e) {
fail();
}
}
@Configurable
public static class FailureReprocessTestScenario {
@Autowired
public BrokerService broker;
@Autowired
public MockTaskProducer mockTaskProducer;
@Autowired
public FailingWorker failingWorker;
@Autowired
public SucceedingWorker succeedingWorker;
@Autowired
public TaskScheduler scheduler;
public void start() {
Date now = new Date();
scheduler.schedule(new Runnable() {
public void run() { failingWorker.start(); }
}, now);
Date after1Seconds = new Date(now.getTime() + 1*1000);
scheduler.schedule(new Runnable() {
public void run() { mockTaskProducer.produceTask(); }
}, after1Seconds);
Date after2Seconds = new Date(now.getTime() + 2*1000);
scheduler.schedule(new Runnable() {
public void run() {
failingWorker.stop();
succeedingWorker.start();
}
}, after2Seconds);
}
public void stop() throws Exception {
succeedingWorker.stop();
broker.stop();
}
}
@Configuration
@ImportResource(value={"classpath:applicationContext-jms.xml",
"classpath:applicationContext-task.xml"})
public static class ContextConfig {
@Autowired
private ConnectionFactory jmsFactory;
@Bean
public FailureReprocessTestScenario testScenario() {
return new FailureReprocessTestScenario();
}
@Bean
public MockTaskProducer mockTaskProducer() {
return new MockTaskProducer();
}
@Bean
public FailingWorker failingWorker() {
TaskListener listener = new TaskListener();
FailingWorker worker = new FailingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
}
@Bean
public SucceedingWorker succeedingWorker() {
TaskListener listener = new TaskListener();
SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
}
private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConnectionFactory(jmsFactory);
listenerContainer.setDestinationName("tasksQueue");
listenerContainer.setMessageListener(listener);
listenerContainer.setAutoStartup(false);
listenerContainer.initialize();
return listenerContainer;
}
}
public static class FailingWorker implements TaskProcessor {
private Logger LOG = Logger.getLogger(FailingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
this.listenerContainer = listenerContainer;
}
public void start() {
LOG.info("FailingWorker.start()");
listenerContainer.start();
}
public void stop() {
LOG.info("FailingWorker.stop()");
listenerContainer.stop();
}
@Override
public void processTask(Object task) {
LOG.info("FailingWorker.processTask(" + task + ")");
try {
Thread.sleep(1*1000);
throw Throwables.propagate(new Exception("Simulate task processing failure"));
} catch (InterruptedException e) {
LOG.log(Level.SEVERE, "Unexpected interruption exception");
}
}
}
public static class SucceedingWorker implements TaskProcessor {
private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public final List<String> processedTasks;
public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
this.listenerContainer = listenerContainer;
this.processedTasks = new ArrayList<String>();
}
public void start() {
LOG.info("SucceedingWorker.start()");
listenerContainer.start();
}
public void stop() {
LOG.info("SucceedingWorker.stop()");
listenerContainer.stop();
}
@Override
public void processTask(Object task) {
LOG.info("SucceedingWorker.processTask(" + task + ")");
try {
TextMessage taskText = (TextMessage) task;
processedTasks.add(taskText.getText());
} catch (JMSException e) {
LOG.log(Level.SEVERE, "Unexpected exception during task processing");
}
}
}
}
TaskListener.java
public class TaskListener implements MessageListener {
private TaskProcessor processor;
@Override
public void onMessage(Message message) {
processor.processTask(message);
}
public void setProcessor(TaskProcessor processor) {
this.processor = processor;
}
}
MockTaskProducer.java
@Configurable
public class MockTaskProducer implements ApplicationContextAware {
private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());
@Autowired
private JmsTemplate jmsTemplate;
private Destination destination;
private int taskCounter = 0;
public void produceTask() {
LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");
taskCounter++;
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("task-" + taskCounter);
return message;
}
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
destination = applicationContext.getBean("tasksQueue", Destination.class);
}
}
1 ответ
Видимо, источник документации, который я искал вчера Создание надежных приложений JMS, вводит меня в заблуждение (или я мог неправильно это понять). Особенно этот отрывок:
Пока сообщение JMS не будет подтверждено, оно не считается успешно использованным. Успешное использование сообщения обычно происходит в три этапа.
- Клиент получает сообщение.
- Клиент обрабатывает сообщение.
- Сообщение подтверждено. Подтверждение инициируется поставщиком JMS или клиентом, в зависимости от режима подтверждения сеанса.
Я предположил, что AUTO_ACKNOWLEDGE делает именно это - подтверждает сообщение после того, как метод слушателя возвращает результат. Но в соответствии со спецификацией JMS это немного отличается, и контейнеры прослушивателя Spring, как и ожидалось, не пытаются изменить поведение из спецификации JMS. Это то, что должен сказать Javadoc AbstractMessageListenerContainer - я подчеркнул важные предложения:
Контейнер слушателя предлагает следующие опции подтверждения сообщения:
- "sessionAcknowledgeMode" установлен в "AUTO_ACKNOWLEDGE" (по умолчанию): автоматическое подтверждение сообщения перед выполнением слушателя; Нет возврата в случае исключения.
- "sessionAcknowledgeMode" установлен в "CLIENT_ACKNOWLEDGE": автоматическое подтверждение сообщения после успешного выполнения слушателя; Нет возврата в случае исключения.
- "sessionAcknowledgeMode" установлен в "DUPS_OK_ACKNOWLEDGE": отложенное подтверждение сообщения во время или после выполнения слушателя; потенциальная повторная доставка в случае исключения.
- "sessionTransacted" установлен в "true": подтверждение транзакции после успешного выполнения слушателя; гарантированная повторная доставка в случае исключения.
Так что ключ к моему решению listenerContainer.setSessionTransacted(true);
Еще одна проблема, с которой я столкнулся, заключалась в том, что провайдер JMS продолжает возвращать сообщение с ошибкой обратно тому же потребителю, который потерпел неудачу при обработке сообщения. Я не знаю, дает ли спецификация JMS рецепт того, что должен делать провайдер в таких ситуациях, но что мне помогло, так это использовать listenerContainer.shutdown();
чтобы отключить отказавшего потребителя и позволить провайдеру повторно доставить сообщение и дать шанс другому потребителю.