Выпуск тестирования весеннего облака SQS Listener
Среда
- Spring Boot: 1.5.13. ВЫПУСК
- Облако: Edgware.SR3
- Облако AWS: 1.2.2. РЕЛИЗ
- Java 8
- OSX 10.13.4
проблема
Я пытаюсь написать интеграционный тест для SQS.
У меня есть локальный работающий контейнер док-станции localstack, на котором запущен SQS TCP/4576
В моем тестовом коде я определяю клиента SQS с конечной точкой, установленной на локальный 4576, и могу успешно подключиться и создать очередь, отправить сообщение и удалить очередь. Я также могу использовать клиент SQS для получения сообщений и получения отправленного мной сообщения.
Моя проблема заключается в том, что если я удаляю код, который получает сообщение вручную, чтобы позволить другому компоненту получить сообщение, то, похоже, ничего не происходит. У меня есть компонент Spring, аннотированный следующим образом:
слушатель
@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
public void receive(final MyMsg msg) {
System.out.println("GOT THE MESSAGE: "+ msg.toString());
}
}
Тестовое задание
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {
@Autowired
private AmazonSQSAsync amazonSQS;
@Autowired
private SimpleMessageListenerContainer container;
private String queueUrl;
@Before
public void setUp() {
queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
}
@After
public void tearDown() {
amazonSQS.deleteQueue(queueUrl);
}
@Test
public void name() throws InterruptedException {
amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
System.out.println("isRunning:" + container.isRunning());
System.out.println("isActive:" + container.isActive());
System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
Thread.sleep(30_000);
System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
}
@TestConfiguration
@EnableSqs
public static class SQSConfiguration {
@Primary
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
.withEndpointConfiguration(endpoint)
.build());
}
}
}
В журналах испытаний я вижу:
oscamlistener.QueueMessageHandler: 1 методов обработки сообщений, найденных в классе MyListener: {public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a} 2018-0531 39.582 ИНФОРМАЦИЯ 16329 ---
oscamlistener.QueueMessageHandler: сопоставил "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" с открытым void MyListener.receive (MyMsg)
С последующим:
isRunning: правда
IsActive: правда
isRunningOnQueue ложь
ПОЛУЧИЛ СООБЩЕНИЕ: 1
Это показывает, что за 30 секундную паузу между отправкой сообщения контейнер не забрал его, и когда я вручную опрашиваю сообщение, оно находится в очереди, и я могу его использовать.
Мой вопрос: почему не вызывается слушатель и почему isRunningOnQueue:false
линия, предполагающая, что она не запускается автоматически для этой очереди?
Обратите внимание, что я также попытался установить свой собственный SimpleMessageListenerContainer
bean-компонент с автоматическим запуском, установленным в true явно (по умолчанию в любом случае), и не заметил никаких изменений в поведении. Я думал, что org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer
это установлено @EnableSqs
должен настроить автозапуск SimpleMessageListenerContainer
это должно быть сообщение для меня.
Я также установил
logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG
в моих свойствах теста я вижу, что HTTP-вызовы создают очередь, отправляют сообщение, удаляют и т. д., но нет HTTP-вызовов для получения (кроме моего ручного в конце теста).
1 ответ
Я понял это после некоторого возни.
Даже если для простой фабрики контейнеров сообщений не установлен автоматический запуск, она все равно выполняет инициализацию, которая включает определение наличия очереди.
В этом случае очередь создается в моем тесте в методе настройки, но, к сожалению, это происходит после настройки контекста пружины, что означает возникновение исключения.
Я исправил это, просто переместив создание очереди в создание контекста клиента SQS (что происходит до создания контейнера сообщений). то есть:
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
.withEndpointConfiguration(endpoint)
.build());
client.createQueue("test-queue");
return client;
}