Spring AWS SQS переподключается после потери соединения
Я использую Spring Cloud AWS (1.0.1.RELEASE) с Spring Boot для запуска потребителя SQS. Приложение работает нормально, но когда оно теряет сетевое соединение (например, если я выключаю свой WIFI на своем ноутбуке, когда он работает на нем), я вижу ошибки на консоли, и приложение никогда не восстанавливается. Он просто висит там и не восстанавливает соединение после того, как сеть становится доступной. Я должен убить это и поднять это. Как заставить его восстановиться самому?
// Spring Boot entry point:
public static void main(String[] args) {
SpringApplication.run(MyConsumerConfiguration.class, args);
}
// Message Listener (A different class)
@MessageMapping(value = "myLogicalQueueName" )
public void receive(MyPOJO object) {
}
Ошибка, которую я вижу на консоли:
Исключение в потоке "simpleMessageListenerContainer-1" com.amazonaws.AmazonClientException: невозможно выполнить HTTP-запрос: sqs.us-east-1.amazonaws.com на com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:473 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:297) при com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2422) в com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1130) на com.amazonaws.services.sqs.AmazonSQSAsyncClient$23.call(AmazonSQSAsyncClient.java:1678) на com.amazonaws.services.sqs.AmazonSQSAsyncClient $ 23.call (AmazonSQSAsyncClient.javaur.tilus. At. 76).FutureTask.run(FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoava:6g)..Thread.run(Thread.java:745
1 ответ
Я только что выяснил проблему, почему SQS не может восстановить соединение после потери сетевого подключения.
На самом деле, похоже, проблема в текущей реализации Spring AWS org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java
private class AsynchronousMessageListener implements Runnable {
private final QueueAttributes queueAttributes;
private final String logicalQueueName;
private AsynchronousMessageListener(String logicalQueueName, QueueAttributes queueAttributes) {
this.logicalQueueName = logicalQueueName;
this.queueAttributes = queueAttributes;
}
@Override
public void run() {
while (isRunning()) {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
break;
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Выше кода раскручивается новый поток, который выполняет опрос в очереди SQS для получения сообщений. После разрыва сетевого подключения getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest())
бросает UnknownHostException
, который не обрабатывается в коде и вызывает завершение потока. Поэтому, когда позднее устанавливается сетевое соединение, нет потока, опрашивающего очередь для получения данных.
Я уже поднял проблему с Spring для этого. Вот ссылка: https://github.com/spring-cloud/spring-cloud-aws/issues/82
Надеюсь, это все объясняет.