Elasticsearch + Spark: написать JSON с пользовательским документом _id

Я пытаюсь написать коллекцию объектов в Elasticsearch от Spark. Я должен соответствовать двум требованиям:

  1. Документ уже сериализован в JSON и должен быть написан как есть
  2. Elasticsearch документ _id должен быть обеспечен

Вот что я попробовал до сих пор.

saveJsonToEs()

Я пытался использовать saveJsonToEs() как это (сериализованный документ содержит поле _id с желаемым идентификатором Elasticsearch):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

Но elasticsearch-hadoop библиотека дает это исключение:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

Если я удалю es.mapping.exclude но продолжай es.mapping.id и отправить JSON с _id внутри (как {"_id":"blah",...})

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

Я получаю эту ошибку:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
...

Когда я пытаюсь отправить этот идентификатор в другом поле (например, {"superID":"blah",...":

 val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "superID")
)

EsSpark.saveJsonToEs(rdd, cfg)

Не удается извлечь поле:

17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

Когда я удаляю es.mapping.id а также es.mapping.exclude из конфигурации это работает, но идентификатор документа генерируется Elasticsearch (что нарушает требование 2):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)

saveToEsWithMeta()

Есть еще одна функция для обеспечения _id и другие метаданные для вставки: saveToEsWithMeta() это позволяет решить требование 2, но не выполняется с требованием 1.

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)

На самом деле Elasticsearch даже не в состоянии разобрать, что elasticsearch-hadoop посылает:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)

Вопрос

Можно ли написать сборник (documentID, serializedDocument) из Spark в Elasticsearch (используя elasticsearch-hadoop)?

PS Я использую Elasticsearch 5.6.3 и Spark 2.1.1.

2 ответа

Наконец, я нашел проблему: это была опечатка в конфиге.

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

Искал поле superID но был только superID (обратите внимание на случай). В вопросе это также немного вводит в заблуждение, поскольку в коде это выглядит как "es.mapping.id", "superID" (что было не правильно).

Фактическое решение, как предложил Леви Рэмси:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)

Разница в том, что es.mapping.id не может быть _id (как было указано в оригинальном сообщении, _id это метаданные и Elasticsearch не принимает их).

Естественно, это означает, что новое поле superID должны быть добавлены к отображению (если отображение не является динамическим). Если хранение дополнительного поля в индексе является бременем, следует также:

  • исключить его из сопоставления
  • и отключить его индексацию

Большое спасибо Алексу Савицкому за указание в правильном направлении.

Вы пробовали что-то вроде:

val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
  ("es.mapping.id", "_id")
)
rdd.saveJsonToEs("myindex/mytype", cfg)

Я тестировал (с asticsearch-hadoop (версия разъема 2.4.5) против ES 1.7), и он работает.

Это можно сделать, передав ES_INPUT_JSON возможность cfg сопоставление параметров и возврат кортежа, содержащего идентификатор документа в качестве первого элемента и документ, сериализованный в JSON, в качестве второго элемента из функции карты.

Я проверил это с "org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0[" против Elasticsearch 6.4

import org.elasticsearch.hadoop.cfg.ConfigurationOptions.{ES_INPUT_JSON, ES_NODES}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

job
  .map{ r => (r._id, r.toJson()) }
  .saveToEsWithMeta(
    "myindex/mytype",
    Map(
      ES_NODES -> "https://localhost:9200",
      ES_INPUT_JSON -> true.toString
    )
  )

Я провел дни, стуча головой о стену, пытаясь понять, почему saveToEsWithMeta не будет работать, когда я использовал строку для идентификатора, например, так:

rdd.map(caseClassContainingJson =>
  (caseClassContainingJson._idWhichIsAString, caseClassContainingJson.jsonString)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))

Это приведет к ошибкам, связанным с синтаксическим анализом JSON, что обманчиво заставляет вас думать, что проблема связана с вашим JSON, но затем вы регистрируете каждый из ваших JSON и видите, что все они действительны.

Оказывается, по какой-то причине ES_INPUT_JSON -> true делает левую часть кортежа, т. е. идентификатор, также анализируется как JSON!

решение, JSON, строковый идентификатор (обернуть идентификатор в дополнительные двойные кавычки), так что синтаксический анализ как JSON работает:

rdd.map(caseClassContainingJson =>
  (
    Json.stringify(JsString(caseClassContainingJson._idWhichIsAString)), 
    caseClassContainingJson.jsonString
  )
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))
  1. ты можешь использовать saveToEs чтобы определить customer_id и не сохранять customer_id
  2. обратите внимание, что rdd RDD[Map] тип
val rdd:RDD[Map[String, Any]]=...
val cfg = Map(
  ("es.mapping.id", your_customer_id),
  ("es.mapping.exclude", your_customer_id)
)
EsSpark.saveToEs(rdd, your_es_index, cfg)
Другие вопросы по тегам