version_conflict_engine_exception с _update_by_query
Я использую обновление ElasticSearch по API запросов во flink, параллелизм flink равен 1. Но у меня есть version_conflict_engine_exception, это мой код во flink RichSinkFunction, например:
UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
builder.abortOnVersionConflict(true);
builder.source(indexName);
builder.filter(filter);
builder.setMaxRetries(MAX_RETRIES);
builder.refresh(true);
String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
.format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);
Map<String, Object> params = Maps.newHashMap();
params.put("fieldName", fieldName);
params.put("updateTime", updateTime);
params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
})));
builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
BulkByScrollResponse response = builder.get();
Я могу быть уверен, что только это приложение имеет доступ к Elasticsearch, параллелизм flink равен 1, как в однопоточном вызове обновления с помощью API запроса? Почему у меня есть version_conflict_engine_exception? и как сделать ровно один раз?
1 ответ
Я вижу две возможности:
- Что-то еще работает, что может обновить документ.
- Приемник elasticsearch в Flink предоставляет гарантии как минимум один раз, что означает, что в случае сбоя приемник иногда будет выполнять дублирующие записи во время восстановления. Возможно, это может привести к попыткам обновить документ с использованием устаревшего номера версии.