SocketTimeoutException при получении или вставке данных в Elastic Search с помощью Rest High Level Client

Я сталкиваюсь SocketTimeoutException во время извлечения / вставки данных из / в эластичный. Это происходит, когда вокруг 10-30 request/second, Эти запросы являются комбинацией get/put.

Вот моя упругая конфигурация:

  • 3 master nodes каждый из 4GB RAM
  • 2 data nodes каждый из 8GM RAM
  • Балансировщик нагрузки Azure, который подключается к указанному выше узлу данных (кажется, на нем открыт только порт 9200). И клиент Java подключается к этому балансировщику нагрузки, поскольку он только выставлен.
  • Эластичная версия: 7.2.0
  • Отдых клиент высокого уровня:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.2.0</version>
    </dependency>
    

Индекс Информация:

  • Индекс осколков: 2
  • Реплика индекса: 1
  • Индекс всего полей: 10000
  • Размер индекса из кибана: Всего27.2 MB & Primaries: 12.2MB
  • Структура индекса:
    {
      "dev-index": {
        "mappings": {
          "properties": {
            "dataObj": {
              "type": "object",
              "enabled": false
            },
            "generatedID": {
              "type": "keyword"
            },
            "transNames": { //it's array of string
              "type": "keyword"
            }
          }
        }
      }
    }
    
  • Динамическое отображение отключено.

Следующее мое elastic Config файл. Здесь у меня есть два компонента соединения, один для чтения и другой для записи в эластичный.

ElasticConfig.java:

@Configuration
public class ElasticConfig {

    @Value("${elastic.host}")
    private String elasticHost;

    @Value("${elastic.port}")
    private int elasticPort;

    @Value("${elastic.user}")
    private String elasticUser;

    @Value("${elastic.pass}")
    private String elasticPass;

    @Value("${elastic-timeout:20}")
    private int timeout;

    @Bean(destroyMethod = "close")
    @Qualifier("readClient")
    public RestHighLevelClient readClient(){

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));

        RestClientBuilder builder = RestClient
                .builder(new HttpHost(elasticHost, elasticPort))
                .setHttpClientConfigCallback(httpClientBuilder -> 
                        httpClientBuilder
                                .setDefaultCredentialsProvider(credentialsProvider)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                );

        builder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder
                        .setConnectTimeout(10000)
                        .setSocketTimeout(60000)
                        .setConnectionRequestTimeout(0)
        );

        RestHighLevelClient restClient = new RestHighLevelClient(builder);
        return restClient;
    }

    @Bean(destroyMethod = "close")
    @Qualifier("writeClient")
    public RestHighLevelClient writeClient(){

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));

        RestClientBuilder builder = RestClient
                .builder(new HttpHost(elasticHost, elasticPort))
                .setHttpClientConfigCallback(httpClientBuilder -> 
                        httpClientBuilder
                                .setDefaultCredentialsProvider(credentialsProvider)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                );

        builder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder
                        .setConnectTimeout(10000)
                        .setSocketTimeout(60000)
                        .setConnectionRequestTimeout(0)
        );

        RestHighLevelClient restClient = new RestHighLevelClient(builder);
        return restClient;
    }

}

Вот функция, которая делает вызов эластика, если данные доступны в эластичном, он возьмет его, иначе он будет генерировать данные и помещать в эластичный.

public Object getData(Request request) {

    DataObj elasticResult = elasticService.getData(request);
    if(elasticResult!=null){
        return elasticResult;
    }
    else{
        //code to generate data
        DataObj generatedData = getData();//some function which will generated data
        //put above data into elastic by Async call.
        elasticAsync.putData(generatedData);
        return generatedData;
    }
}

ElasticService.java getData Функция:

@Service
public class ElasticService {

    @Value("${elastic.index}")
    private String elasticIndex;

    @Autowired
    @Qualifier("readClient")
    private RestHighLevelClient readClient;

    public DataObj getData(Request request){
        String generatedId = request.getGeneratedID();

        GetRequest getRequest = new GetRequest()
                .index(elasticIndex)   //elastic index name
                .id(generatedId);   //retrieving by index id from elastic _id field (as key-value)

        DataObj result = null;
        try {
            GetResponse response = readClient.get(getRequest, RequestOptions.DEFAULT);
            if(response.isExists()) {
                ObjectMapper objectMapper = new ObjectMapper();
                result = objectMapper.readValue(response.getSourceAsString(), DataObj.class);
            }
        }  catch (Exception e) {
            LOGGER.error("Exception occurred during  fetch from elastic !!!! " + ,e);
        }
        return result;
    }

}

Функция ElasticAsync.java Async Put данных:

@Service
public class ElasticAsync {

    private static final Logger LOGGER = Logger.getLogger(ElasticAsync.class.getName());

    @Value("${elastic.index}")
    private String elasticIndex;

    @Autowired
    @Qualifier("writeClient")
    private RestHighLevelClient writeClient;

    @Async
    public void putData(DataObj generatedData){
     ElasticVO updatedRequest = toElasticVO(generatedData);//ElasticVO matches to the structure of index given above.

        try {
            ObjectMapper objectMapper = new ObjectMapper();
            String jsonString = objectMapper.writeValueAsString(updatedRequest);

            IndexRequest request = new IndexRequest(elasticIndex);
            request.id(generatedData.getGeneratedID());
            request.source(jsonString, XContentType.JSON);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
            request.timeout(TimeValue.timeValueSeconds(5));
            IndexResponse indexResponse = writeClient.index(request, RequestOptions.DEFAULT);
            LOGGER.info("response id: " + indexResponse.getId());

            }

        } catch (Exception e) {
            LOGGER.error("Exception occurred during saving into elastic !!!!",e);
        }


    }

}

Вот некоторая часть трассировки стека, когда происходит исключение во время сохранения данных в эластичный:

2019-07-19 07:32:19.997 ERROR [data-retrieval,341e6ecc5b10f3be,1eeb0722983062b2,true] 1 --- [askExecutor-894] a.c.s.a.service.impl.ElasticAsync        : Exception occurred during saving into elastic !!!!

java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
    at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]


Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    ... 1 common frames omitted

Вот некоторая часть трассировки стека, когда происходит исключение при извлечении данных в эластичный объект:

2019-07-19 07:22:37.844 ERROR [data-retrieval,104cf6b2ab5b3349,b302d3d3cd7ebc84,true] 1 --- [o-8080-exec-346] a.c.s.a.service.impl.ElasticService      : Exception occurred during  fetch from elastic !!!! 

java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1433) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1403) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1373) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
    at org.elasticsearch.client.RestHighLevelClient.get(RestHighLevelClient.java:699) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]



Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
    ... 1 common frames omitted

Я прошел через пару stackru & elastic связанные блоги, где они упоминали эту проблему, может быть связано с RAM & cluster конфигурация упругая. Затем я изменил свои шарды с 5 на 2, так как было только два узла данных. Также увеличен объем узлов данных с 4 ГБ до 8 ГБ, так как я узнал, что эластичный будет использовать только 50% всего RAM, Количество исключений уменьшилось, но проблема все еще сохраняется.

Какие могут быть возможные способы решения этой проблемы? Чего мне не хватает с точки зрения конфигурации Java / упругости, которая часто бросает такого рода SocketTimeoutException? Дайте мне знать, если вам требуется более подробная информация о конфигурации.

0 ответов

У нас была та же проблема, и после некоторого поиска я нашел основную причину: несоответствие конфигурации брандмауэра между клиентом и конфигурацией ядра эластичных серверов для tcp keep alive.

Брандмауэр разрывает незанятые соединения через 3600 секунд. Проблема заключалась в том, что параметр ядра для tcp keep alive был установлен на 7200 секунд (по умолчанию в RedHat 6.x/7.x):

sysctl -n net.ipv4.tcp_keepalive_time
7200

Таким образом, соединения сбрасываются до того, как будет отправлен зонд активности. AsyncHttpClient в эластичном http-клиенте, похоже, не очень хорошо обрабатывает сброшенные соединения, он просто ждет, пока не истечет время ожидания сокета.

Поэтому проверьте, есть ли у вас какое-либо сетевое устройство (балансировщик нагрузки, брандмауэр, прокси и т. Д.) Между вашим клиентом и сервером, у которого есть тайм-аут сеанса или аналогичный, и либо увеличьте этот тайм-аут, либо уменьшите параметр ядра tcp_keep_alive.

Другие вопросы по тегам