ElasticSearch и Apache HttpAsyncClient

Я пытаюсь использовать ElasticSearch REST API с Java Apache HttpAsyncClient библиотека. Я хочу использовать постоянное конвейерное соединение. Вот некоторый тестовый код (вывод в комментариях):

@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
    testPost(HttpAsyncClients.createDefault());
    //201: {"_index":"test_index","_type":"test_type","_id":"AVIHYGnqdqqg_TAHm4ix","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
    testPost(HttpAsyncClients.createPipelining());
    //400: No handler found for uri [http://127.0.0.1:9200/test_index/test_type] and method [POST]
}

private void testPost(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
    client.start();
    HttpPost request = new HttpPost("http://127.0.0.1:9200/test_index/test_type");
    request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
    Future<HttpResponse> responseFuture = client.execute(request, null);
    HttpResponse response = responseFuture.get();
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}

Я не могу понять, почему это работает с HttpAsyncClients.createDefault() клиент, но не работает с HttpAsyncClients.createPipelining(), Также я не могу понять разницу между этими двумя методами создания.

Почему я получаю сообщение об ошибке при использовании createPipelining()?

Я пытался увидеть разницу с https://httpbin.org/post но он показал одинаковый результат с обоими вариантами. Я использую настройки ElasticSearch по умолчанию.

Спасибо!


UPD1

Я пробовал с PUT документ (PUT http://127.0.0.1/test_index/test_type/<doc id>) запрос с тем же результатом - он отлично работает с createDefault() но я получил похожую ошибку, когда сделать это с createPipelining() - Обработчик не найден <...>.

Но когда я пытаюсь выполнить запрос на создание индекса (PUT http://127.0.0.1/<index name>) есть еще одна ошибка. Смотрите код ниже:

@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
    testCreateIndex(HttpAsyncClients.createDefault());
    //200: {"acknowledged":true}
    testCreateIndex(HttpAsyncClients.createPipelining());
    //400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"}],"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"},"status":400}
}

private void testCreateIndex(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
    client.start();
    HttpPut request = new HttpPut("http://127.0.0.1:9200/" + RandomStringUtils.randomAlphabetic(8).toLowerCase());
    Future<HttpResponse> responseFuture = client.execute(request, null);
    HttpResponse response = responseFuture.get();
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}

Как я вижу на этой странице документации, ElasticSearch по умолчанию поддерживает конвейеризацию HTTP. Может быть, что-нибудь мне нужно изменить в настройках ES?


UPD2

Вот несколько проводных журналов для кода в разделе UPD1 с различными настройками регистрации:

Dorg.apache.commons.logging.simplelog.log.org.apache.http=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=INFO

http://pastebin.com/v29uvgbj

-Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.conn=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG

http://pastebin.com/G9ij15d6


UPD3

Я просто попытался заменить createDefault() на createMinimal(), и это вызвало ту же ошибку, что и createPipelining(). Любые идеи, что в MinimalHttpAsyncClient может вызвать эту проблему? Может быть, есть способ, которым я могу вручную создать клиент конвейеризации (с классами компоновщика) без этой проблемы?

2 ответа

Решение

Сервер должен подавиться абсолютным URI запроса в строке запроса.

[DEBUG] wire - http-outgoing-1 >> "PUT http://127.0.0.1:9200/ydiwdsid HTTP/1.1[\r][\n]"

HttpAsyncClient в режиме конвейеризации использует минимальную цепочку обработки протокола. Он не пытается переписать URI запроса объекта запроса.

В вашем конкретном случае конвейерная обработка запросов не имеет особого смысла. Не говоря уже о том, что если вы не отправляете запросы в пакетном режиме, вы даже не используете конвейерное выполнение.

На самом деле, вам просто нужно извлечь хост из URL и создать HttpPost объект только с абсолютным путем. Смотрите изменения во второй, третьей и пятой строках ниже:

client.start();
HttpHost targetHost = new HttpHost("127.0.0.1", 9200);
HttpPost request = new HttpPost("/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(targetHost, request, null);
HttpResponse response = responseFuture.get();
System.out.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));

Выполнение этих трех изменений и повторный запуск кода приведут к следующему:

201: {"_index":"test_index","_type":"test_type","_id":"AVISSimIZHOoPG8ibOyF","_version":1,"created":true}
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimjZHOoPG8ibOyG","_version":1,"created":true}
Другие вопросы по тегам