Выпуск тестирования весеннего облака SQS Listener

Среда

  • Spring Boot: 1.5.13. ВЫПУСК
  • Облако: Edgware.SR3
  • Облако AWS: 1.2.2. РЕЛИЗ
  • Java 8
  • OSX 10.13.4

проблема

Я пытаюсь написать интеграционный тест для SQS.

У меня есть локальный работающий контейнер док-станции localstack, на котором запущен SQS TCP/4576

В моем тестовом коде я определяю клиента SQS с конечной точкой, установленной на локальный 4576, и могу успешно подключиться и создать очередь, отправить сообщение и удалить очередь. Я также могу использовать клиент SQS для получения сообщений и получения отправленного мной сообщения.

Моя проблема заключается в том, что если я удаляю код, который получает сообщение вручную, чтобы позволить другому компоненту получить сообщение, то, похоже, ничего не происходит. У меня есть компонент Spring, аннотированный следующим образом:

слушатель

@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
    public void receive(final MyMsg msg) {
        System.out.println("GOT THE MESSAGE: "+ msg.toString());
    }
}

Тестовое задание

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {

    @Autowired
    private AmazonSQSAsync amazonSQS;

    @Autowired
    private SimpleMessageListenerContainer container;

    private String queueUrl;

    @Before
    public void setUp() {
        queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
    }

    @After
    public void tearDown() {
        amazonSQS.deleteQueue(queueUrl);
    }

    @Test
    public void name() throws InterruptedException {
        amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
        System.out.println("isRunning:" + container.isRunning());
        System.out.println("isActive:" + container.isActive());
        System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
        Thread.sleep(30_000);
        System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
    }

    @TestConfiguration
    @EnableSqs
    public static class SQSConfiguration {

        @Primary
        @Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() {
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
            return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
        }
    }
}

В журналах испытаний я вижу:

oscamlistener.QueueMessageHandler: 1 методов обработки сообщений, найденных в классе MyListener: {public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a} 2018-0531 39.582 ИНФОРМАЦИЯ 16329 ---

oscamlistener.QueueMessageHandler: сопоставил "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" с открытым void MyListener.receive (MyMsg)

С последующим:

isRunning: правда

IsActive: правда

isRunningOnQueue ложь

ПОЛУЧИЛ СООБЩЕНИЕ: 1

Это показывает, что за 30 секундную паузу между отправкой сообщения контейнер не забрал его, и когда я вручную опрашиваю сообщение, оно находится в очереди, и я могу его использовать.

Мой вопрос: почему не вызывается слушатель и почему isRunningOnQueue:false линия, предполагающая, что она не запускается автоматически для этой очереди?

Обратите внимание, что я также попытался установить свой собственный SimpleMessageListenerContainer bean-компонент с автоматическим запуском, установленным в true явно (по умолчанию в любом случае), и не заметил никаких изменений в поведении. Я думал, что org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer это установлено @EnableSqs должен настроить автозапуск SimpleMessageListenerContainer это должно быть сообщение для меня.

Я также установил

logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG

в моих свойствах теста я вижу, что HTTP-вызовы создают очередь, отправляют сообщение, удаляют и т. д., но нет HTTP-вызовов для получения (кроме моего ручного в конце теста).

1 ответ

Решение

Я понял это после некоторого возни.

Даже если для простой фабрики контейнеров сообщений не установлен автоматический запуск, она все равно выполняет инициализацию, которая включает определение наличия очереди.

В этом случае очередь создается в моем тесте в методе настройки, но, к сожалению, это происходит после настройки контекста пружины, что означает возникновение исключения.

Я исправил это, просто переместив создание очереди в создание контекста клиента SQS (что происходит до создания контейнера сообщений). то есть:

@Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() {
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
            final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
            client.createQueue("test-queue");
            return client;
        }
Другие вопросы по тегам