Как конвертировать типы при чтении данных из Elasticsearch с использованиемластиком поиска в SPARK
Когда я пытаюсь прочитать данные из asticsearch, используя esRDD("index")
функция в asticsearch-spark, я получаю результаты в виде org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])]
, И когда я проверяю значения, они все типа AnyRef
, Тем не менее, я видел на сайте ES, он говорит:
asticsearch-hadoop автоматически конвертирует встроенные типы Spark в типы Elasticsearch (и обратно)
Мои зависимости:
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.0"
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.4.0"
Я что-то пропустил? И как я могу конвертировать типы удобным способом?
1 ответ
ОК, я нашел решение. Если вы используете esRDD
Все типы информации теряются.
Лучше, если мы используем:
val df = sparkSession.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include", "").load("index")
Вы можете настроить в option
Если вы сделали это раньше, option
можно игнорировать
Данные возвращаются в DataFrame
и типы данных сохраняются (преобразуются в sql.DataTypes
) в схеме, если преобразование поддерживается elasticsearch-spark
,
И теперь вы можете делать все, что захотите.