Integration Testing RabbitMQ listner - периодически происходит сбой, потому что сообщение требует времени для постановки в очередь
Я написал интеграционный тест для следующего потока, используя RabbitMock (нашел его на github, и он кажется действительно классным):
Сообщение добавляется в очередь входящих сообщений -> список входящих сообщений забирает сообщение -> обрабатывает его -> помещает новое исходящее сообщение в новую очередь. Outgoing-message-queue -> (только для тестов). список для этой исходящей очереди в src / test / resources.
Все работает (с одним важным затруднением - прерывистым тайм-аутом), и я делаю утверждения, как показано ниже:
List<OutgoingData> receivedMessages = new ArrayList<>();
assertTimeoutPreemptively(ofMillis(15000L), () -> {
while (receivedMessages.isEmpty()) {
OutgoingData data =
receiver.getOutgoingData();
if(data != null){
receivedMessages.add(data);
}
}
}
);
assertThat(receivedMessages.get(0)).isNotNull();
assertThat
(receivedMessages.get(0).getRecipient())
.isEqualTo("enabled@localhost");
Тайм-аут в этом тесте - реальная проблема, с которой я сталкиваюсь.
- Из-за таймаута тесты становятся медленными.
- Если я удаляю тайм-аут, тесты застряли в Дженкинсе и должны быть насильственно убиты.
- Время от времени этого таймаута в 15000 миллисекунд также недостаточно, и тесты не проходят.
Мне было интересно, есть ли лучший способ справиться с такой ситуацией в интеграционном тесте.
Ждем ваших отзывов.
Большое спасибо, Баньянбат
1 ответ
Когда я немного подумал и поговорил об этом с одним из членов моей команды, мне пришло в голову, что здесь можно эффективно использовать фьючерс на Java 8.
Я реализовал это следующим образом, и это сработало как шарм.
@Test
public void basic_consume_case()
InterruptedException, ExecutionException {
IncomingData incomingData = new IncomingData();
incomingData.setRecipient("enabled@localhost");
incomingData.setSender("unblocked@localhost");
incomingData.setStoreKey("123");
incomingData.setSubject("Hello");
incomingData.setText("Hello from Test 1");
try (AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(
ConnectionFactoryConfiguration.class)) {
sendMessage(incomingData);
Future<OutgoingData> receivedMessageFuture = pollOutgoingQueueForData();
OutgoingData receivedMessage = receivedMessageFuture.get();
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getRecipient()).isEqualTo("enabled@localhost");
assertThat(receivedMessage.getContent())
...
}
}
private void sendMessage(IncomingData incomingData) {
try {
rabbitTemplate.convertAndSend("incoming-data-queue", incomingData, m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
} finally {
}
}
private Future<OutgoingData> pollOutgoingQueueForData() throws InterruptedException {
return executor.submit(() -> {
OutgoingData receivedMessage = null;
while (receivedMessage == null) {
receivedMessage = (OutgoingData)
rabbitTemplate.receiveAndConvert("outgoing-queue");
}
return receivedMessage;
});
}