Elasticsearch + Spark: написать JSON с пользовательским документом _id
Я пытаюсь написать коллекцию объектов в Elasticsearch от Spark. Я должен соответствовать двум требованиям:
- Документ уже сериализован в JSON и должен быть написан как есть
- Elasticsearch документ
_id
должен быть обеспечен
Вот что я попробовал до сих пор.
saveJsonToEs()
Я пытался использовать saveJsonToEs()
как это (сериализованный документ содержит поле _id
с желаемым идентификатором Elasticsearch):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
Но elasticsearch-hadoop
библиотека дает это исключение:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
Если я удалю es.mapping.exclude
но продолжай es.mapping.id
и отправить JSON с _id
внутри (как {"_id":"blah",...}
)
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
Я получаю эту ошибку:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
Когда я пытаюсь отправить этот идентификатор в другом поле (например, {"superID":"blah",..."
:
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
Не удается извлечь поле:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
Когда я удаляю es.mapping.id
а также es.mapping.exclude
из конфигурации это работает, но идентификатор документа генерируется Elasticsearch (что нарушает требование 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
Есть еще одна функция для обеспечения _id
и другие метаданные для вставки: saveToEsWithMeta()
это позволяет решить требование 2, но не выполняется с требованием 1.
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
На самом деле Elasticsearch даже не в состоянии разобрать, что elasticsearch-hadoop
посылает:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
Вопрос
Можно ли написать сборник (documentID, serializedDocument)
из Spark в Elasticsearch (используя elasticsearch-hadoop
)?
PS Я использую Elasticsearch 5.6.3 и Spark 2.1.1.
2 ответа
Наконец, я нашел проблему: это была опечатка в конфиге.
[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
Искал поле superID
но был только superID
(обратите внимание на случай). В вопросе это также немного вводит в заблуждение, поскольку в коде это выглядит как "es.mapping.id", "superID"
(что было не правильно).
Фактическое решение, как предложил Леви Рэмси:
val json = """{"foo":"bar","superID":"deadbeef"}"""
val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
("es.mapping.id", "superID"),
("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)
Разница в том, что es.mapping.id
не может быть _id
(как было указано в оригинальном сообщении, _id
это метаданные и Elasticsearch не принимает их).
Естественно, это означает, что новое поле superID
должны быть добавлены к отображению (если отображение не является динамическим). Если хранение дополнительного поля в индексе является бременем, следует также:
- исключить его из сопоставления
- и отключить его индексацию
Большое спасибо Алексу Савицкому за указание в правильном направлении.
Вы пробовали что-то вроде:
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.mapping.id", "_id")
)
rdd.saveJsonToEs("myindex/mytype", cfg)
Я тестировал (с asticsearch-hadoop (версия разъема 2.4.5) против ES 1.7), и он работает.
Это можно сделать, передав ES_INPUT_JSON
возможность cfg
сопоставление параметров и возврат кортежа, содержащего идентификатор документа в качестве первого элемента и документ, сериализованный в JSON, в качестве второго элемента из функции карты.
Я проверил это с "org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0["
против Elasticsearch 6.4
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.{ES_INPUT_JSON, ES_NODES}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._
job
.map{ r => (r._id, r.toJson()) }
.saveToEsWithMeta(
"myindex/mytype",
Map(
ES_NODES -> "https://localhost:9200",
ES_INPUT_JSON -> true.toString
)
)
Я провел дни, стуча головой о стену, пытаясь понять, почему saveToEsWithMeta
не будет работать, когда я использовал строку для идентификатора, например, так:
rdd.map(caseClassContainingJson =>
(caseClassContainingJson._idWhichIsAString, caseClassContainingJson.jsonString)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
ES_INPUT_JSON -> true.toString
))
Это приведет к ошибкам, связанным с синтаксическим анализом JSON, что обманчиво заставляет вас думать, что проблема связана с вашим JSON, но затем вы регистрируете каждый из ваших JSON и видите, что все они действительны.
Оказывается, по какой-то причине ES_INPUT_JSON -> true
делает левую часть кортежа, т. е. идентификатор, также анализируется как JSON!
решение, JSON, строковый идентификатор (обернуть идентификатор в дополнительные двойные кавычки), так что синтаксический анализ как JSON работает:
rdd.map(caseClassContainingJson =>
(
Json.stringify(JsString(caseClassContainingJson._idWhichIsAString)),
caseClassContainingJson.jsonString
)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
ES_INPUT_JSON -> true.toString
))
- ты можешь использовать
saveToEs
чтобы определить customer_id и не сохранять customer_id - обратите внимание, что rdd
RDD[Map]
тип
val rdd:RDD[Map[String, Any]]=...
val cfg = Map(
("es.mapping.id", your_customer_id),
("es.mapping.exclude", your_customer_id)
)
EsSpark.saveToEs(rdd, your_es_index, cfg)