Elasticsearch: обработка ошибок массового запроса

Я использую Elasticsearch Массовый API для отправки большого количества документов для индексации и удаления сразу. Если для одного документа произошла ошибка, другие документы будут успешно проиндексированы или удалены. И это приводит к неправильному состоянию данных в elasticstore, потому что в моем случае документы как бы связаны друг с другом. Я имею в виду, если поле одного документа имеет какое-то значение, то есть другие документы, которые также должны иметь такое же значение для этого поля. Я не уверен, как я могу обработать такие ошибки из массовых запросов. Возможно ли откатить запрос любым способом? Я читал похожие вопросы, но не смог найти решения по таким случаям. Или вместо отката, есть ли способ отправить данные, только если нет ошибки? или что-то вроде пробного запуска запроса возможно?

2 ответа

Приведенное выше решение для использования вывода BulkResponse в основном предназначено для обработки следующих пакетных запросов. Что, если я хочу прервать пакетную обработку в том месте, где не удалось выполнить какой-либо запрос в пакете. Мы отправляем массовые события, которые связаны друг с другом. Пример моей проблемы: партия (E1-E10), если партия завершается сбоем на E5. Я не хочу, чтобы E6-E10 обрабатывались, потому что они связаны. В таком случае мне нужен немедленный ответ.

Я опаздываю с вопросом, но отвечу тем, кто столкнется с подобным сценарием в будущем.

После выполнения массового API Elasticsearch (ES), также известного как BulkRequestвы получите BulkResponse взамен который состоит из одного или нескольких BulkItemResponse. BulkItemResponse имеет метод isFailed()который сообщит вам, не удалось ли это действие. В вашем случае вы можете просмотреть все элементы в ответе, если есть сбои, и обработать неудавшиеся ответы в соответствии с вашими требованиями.

Код будет выглядеть примерно так для Synchronous исполнение:

val bulkResponse: BulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
bulkResponse.iterator.asScala
.filter(_.isFailed)
.foreach(item => { // your logic to handle failures })

За Asynchronous исполнение, вы можете предоставить listenerкоторый будет вызываться после завершения выполнения. Вы должны преодолетьonResponse() а также onFailure()в этом случае. Вы можете узнать больше об этом на https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html.

HTH.

Другие вопросы по тегам