Elasticsearch: обработка ошибок массового запроса
Я использую Elasticsearch
Массовый API для отправки большого количества документов для индексации и удаления сразу. Если для одного документа произошла ошибка, другие документы будут успешно проиндексированы или удалены. И это приводит к неправильному состоянию данных в elasticstore, потому что в моем случае документы как бы связаны друг с другом. Я имею в виду, если поле одного документа имеет какое-то значение, то есть другие документы, которые также должны иметь такое же значение для этого поля. Я не уверен, как я могу обработать такие ошибки из массовых запросов. Возможно ли откатить запрос любым способом? Я читал похожие вопросы, но не смог найти решения по таким случаям. Или вместо отката, есть ли способ отправить данные, только если нет ошибки? или что-то вроде пробного запуска запроса возможно?
2 ответа
Приведенное выше решение для использования вывода BulkResponse в основном предназначено для обработки следующих пакетных запросов. Что, если я хочу прервать пакетную обработку в том месте, где не удалось выполнить какой-либо запрос в пакете. Мы отправляем массовые события, которые связаны друг с другом. Пример моей проблемы: партия (E1-E10), если партия завершается сбоем на E5. Я не хочу, чтобы E6-E10 обрабатывались, потому что они связаны. В таком случае мне нужен немедленный ответ.
Я опаздываю с вопросом, но отвечу тем, кто столкнется с подобным сценарием в будущем.
После выполнения массового API Elasticsearch (ES), также известного как BulkRequest
вы получите BulkResponse
взамен который состоит из одного или нескольких BulkItemResponse
. BulkItemResponse
имеет метод isFailed()
который сообщит вам, не удалось ли это действие. В вашем случае вы можете просмотреть все элементы в ответе, если есть сбои, и обработать неудавшиеся ответы в соответствии с вашими требованиями.
Код будет выглядеть примерно так для Synchronous
исполнение:
val bulkResponse: BulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
bulkResponse.iterator.asScala
.filter(_.isFailed)
.foreach(item => { // your logic to handle failures })
За Asynchronous
исполнение, вы можете предоставить listener
который будет вызываться после завершения выполнения. Вы должны преодолетьonResponse()
а также onFailure()
в этом случае. Вы можете узнать больше об этом на https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html.
HTH.