Integration Testing RabbitMQ listner - периодически происходит сбой, потому что сообщение требует времени для постановки в очередь

Я написал интеграционный тест для следующего потока, используя RabbitMock (нашел его на github, и он кажется действительно классным):

Сообщение добавляется в очередь входящих сообщений -> список входящих сообщений забирает сообщение -> обрабатывает его -> помещает новое исходящее сообщение в новую очередь. Outgoing-message-queue -> (только для тестов). список для этой исходящей очереди в src / test / resources.

Все работает (с одним важным затруднением - прерывистым тайм-аутом), и я делаю утверждения, как показано ниже:

List<OutgoingData> receivedMessages = new ArrayList<>();
            assertTimeoutPreemptively(ofMillis(15000L), () -> {
                    while (receivedMessages.isEmpty()) {
                        OutgoingData data = 
receiver.getOutgoingData();
                        if(data != null){
                            receivedMessages.add(data);
                        }

                    }
                }
            );

            assertThat(receivedMessages.get(0)).isNotNull();

 assertThat
(receivedMessages.get(0).getRecipient())
.isEqualTo("enabled@localhost");

Тайм-аут в этом тесте - реальная проблема, с которой я сталкиваюсь.

  1. Из-за таймаута тесты становятся медленными.
  2. Если я удаляю тайм-аут, тесты застряли в Дженкинсе и должны быть насильственно убиты.
  3. Время от времени этого таймаута в 15000 миллисекунд также недостаточно, и тесты не проходят.

Мне было интересно, есть ли лучший способ справиться с такой ситуацией в интеграционном тесте.

Ждем ваших отзывов.

Большое спасибо, Баньянбат

1 ответ

Решение

Когда я немного подумал и поговорил об этом с одним из членов моей команды, мне пришло в голову, что здесь можно эффективно использовать фьючерс на Java 8.

Я реализовал это следующим образом, и это сработало как шарм.

@Test
public void basic_consume_case()
        InterruptedException, ExecutionException {
    IncomingData incomingData = new IncomingData();
    incomingData.setRecipient("enabled@localhost");
    incomingData.setSender("unblocked@localhost");
    incomingData.setStoreKey("123");
    incomingData.setSubject("Hello");
    incomingData.setText("Hello from Test 1");
    try (AnnotationConfigApplicationContext context = new 
   AnnotationConfigApplicationContext(
            ConnectionFactoryConfiguration.class)) {

        sendMessage(incomingData);

        Future<OutgoingData> receivedMessageFuture = pollOutgoingQueueForData();

        OutgoingData receivedMessage = receivedMessageFuture.get();

        assertThat(receivedMessage).isNotNull();
        assertThat(receivedMessage.getRecipient()).isEqualTo("enabled@localhost");
        assertThat(receivedMessage.getContent())
        ...

    }
}
private void sendMessage(IncomingData incomingData) {
    try {
        rabbitTemplate.convertAndSend("incoming-data-queue", incomingData, m -> {
            m.getMessageProperties().setContentType("application/json");
            return m;
        });

    } finally {
    }
}

private Future<OutgoingData> pollOutgoingQueueForData() throws InterruptedException {

    return executor.submit(() -> {
        OutgoingData receivedMessage = null;
        while (receivedMessage == null) {
            receivedMessage = (OutgoingData) 
rabbitTemplate.receiveAndConvert("outgoing-queue");
        }
        return receivedMessage;
    });

}
Другие вопросы по тегам