Spring AWS SQS переподключается после потери соединения

Я использую Spring Cloud AWS (1.0.1.RELEASE) с Spring Boot для запуска потребителя SQS. Приложение работает нормально, но когда оно теряет сетевое соединение (например, если я выключаю свой WIFI на своем ноутбуке, когда он работает на нем), я вижу ошибки на консоли, и приложение никогда не восстанавливается. Он просто висит там и не восстанавливает соединение после того, как сеть становится доступной. Я должен убить это и поднять это. Как заставить его восстановиться самому?

// Spring Boot entry point: 
public static void main(String[] args) {
    SpringApplication.run(MyConsumerConfiguration.class, args);
}

// Message Listener (A different class)
@MessageMapping(value = "myLogicalQueueName" )
public void receive(MyPOJO object) {

}

Ошибка, которую я вижу на консоли:

Исключение в потоке "simpleMessageListenerContainer-1" com.amazonaws.AmazonClientException: невозможно выполнить HTTP-запрос: sqs.us-east-1.amazonaws.com на com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:473 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:297) при com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2422) в com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1130) на com.amazonaws.services.sqs.AmazonSQSAsyncClient$23.call(AmazonSQSAsyncClient.java:1678) на com.amazonaws.services.sqs.AmazonSQSAsyncClient $ 23.call (AmazonSQSAsyncClient.javaur.tilus. At. 76).FutureTask.run(FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoava:6g)..Thread.run(Thread.java:745

1 ответ

Решение

Я только что выяснил проблему, почему SQS не может восстановить соединение после потери сетевого подключения.

На самом деле, похоже, проблема в текущей реализации Spring AWS org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java

private class AsynchronousMessageListener implements Runnable {

    private final QueueAttributes queueAttributes;
    private final String logicalQueueName;

    private AsynchronousMessageListener(String logicalQueueName, QueueAttributes queueAttributes) {
        this.logicalQueueName = logicalQueueName;
        this.queueAttributes = queueAttributes;
    }

    @Override
    public void run() {
        while (isRunning()) {
            ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
            CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
            for (Message message : receiveMessageResult.getMessages()) {
                if (isRunning()) {
                    MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                    getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
                } else {
                    break;
                }
            }
            try {
                messageBatchLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Выше кода раскручивается новый поток, который выполняет опрос в очереди SQS для получения сообщений. После разрыва сетевого подключения getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()) бросает UnknownHostException, который не обрабатывается в коде и вызывает завершение потока. Поэтому, когда позднее устанавливается сетевое соединение, нет потока, опрашивающего очередь для получения данных.

Я уже поднял проблему с Spring для этого. Вот ссылка: https://github.com/spring-cloud/spring-cloud-aws/issues/82

Надеюсь, это все объясняет.

Другие вопросы по тегам