ElasticSearch Java API Client — отправляйте уже сериализованные данные и избегайте сериализации

У меня есть тема Kafka с данными JSON. Теперь я пытаюсь отправить эти строки JSON в тему ES, используя новый «Java API Client» ( https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/index.html), но я столкнулся с исключением парсера:

      co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/index] failed: [mapper_parsing_exception] failed to parse
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:953)

Это исключение возникает в последней строке следующего кода:

      final IndexRequest<String> request =
          new IndexRequest.Builder<String>()
              .index("myIndex")
              .id(String.valueOf(UUID.randomUUID()))
              .document(consumerRecord.value()) //already serialized json data
              .build();
elasticsearchClient.index(request);

Насколько я понимаю, это исключение возникает из-за того, что клиент ES пытается сериализовать данные, которые я предоставляю, которые уже сериализованы, что приводит к искаженной строке JSON.

Есть ли способ обойти это и просто отправить простые строки JSON? Также я считаю, что это было возможно с более ранней «библиотекой Java низкого уровня», верно? И да, я знаю, что есть способы разрешить общение между Kafka и ES без написания Consumer.

Спасибо за любые подсказки.

3 ответа

Если вы используетеJacksonJsonpMapperпри создании своегоElasticsearchTransport, вы можете использовать собственный класс для отправки уже сериализованного JSON.

      ElasticsearchTransport transport = new RestClientTransport(
    createLowLevelRestClient(), // supply your own!
    new JacksonJsonpMapper()
);

ElasticsearchClient client = new ElasticsearchClient(transport);

IndexResponse response = client.index(indexReq -> indexReq
    .index("my-index")
    .id("docId")
    .document(new PreserializedJson("{\"foo\":\"bar\"}"))
);
System.out.println(response);

Вот источник дляPreserializedJson:

      import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static java.util.Objects.requireNonNull;

@JsonSerialize(using = PreserializedJson.Serializer.class)
public class PreserializedJson {
  private final String value;

  public PreserializedJson(String value) {
    this.value = requireNonNull(value);
  }

  public PreserializedJson(byte[] value) {
    this(new String(value, StandardCharsets.UTF_8));
  }

  public static class Serializer extends StdSerializer<PreserializedJson> {
    public Serializer() {
      super(PreserializedJson.class);
    }

    @Override
    public void serialize(PreserializedJson value, JsonGenerator gen, SerializerProvider provider) throws IOException {
      gen.writeRaw(value.value);
    }
  }
}

С новым API-клиентом вы можете вставлять в него необработанный json. Как указано здесь: Использование необработанных данных json

      IndexRequest<JsonData> request = IndexRequest.of(i -> i
    .index("logs")
    .withJson(input)
);

Я решил проблему, заменив "Java API Client" ( https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/introduction.html) на "Java Low Level Rest Client" ( https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html).

Эта библиотека позволяет отправлять произвольные строки JSON в ES:

        final Request request = new Request("POST", "/twitter/_doc");
  request.setJsonEntity(record.value());
  restClient.performRequest(request);
Другие вопросы по тегам