Flink ES-соединение не компилируется, как ожидалось

Моя проблема в некотором роде описана здесь. Часть кода (фактически взята с сайта apache) выглядит следующим образом

val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))

val esSinkBuilder = new ElasticsearchSink.Builder[String](
  httpHosts,
  new ElasticsearchSinkFunction[String] {
    def createIndexRequest(element: String): IndexRequest = {
      val json = new java.util.HashMap[String, String]
      json.put("data", element)


      return Requests.indexRequest()
              .index("my-index")
              .`type`("my-type")
              .source(json)

Если я добавлю эти три утверждения, я получаю сообщение об ошибке, как показано ниже.

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

Я получаю ошибку

object elasticsearch is not a member of package org.apache.flink.streaming.connectors
object elasticsearch6 is not a member of package org.apache.flink.streaming.connectors

Если я не добавлю эти операторы импорта, я получу ошибку, как показано ниже.

Compiling 1 Scala source to E:\sar\scala\practice\readstbdata\target\scala-2.11\classes ...
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:35:25: not found: value ElasticsearchSink
[error] val esSinkBuilder = new ElasticsearchSink.Builder[String](
[error]                         ^
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:37:7: not found: type ElasticsearchSinkFunction
[error]   new ElasticsearchSinkFunction[String] {
[error]       ^
[error] two errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed 10 Feb, 2020 2:15:04 PM

Вопрос о стеке, о котором я упоминал выше, некоторые функции были расширены. Насколько я понимаю, flink.streaming.connectors.elasticsearch необходимо расширить до библиотек REST. 1) Правильно ли я понимаю 2) Если да, могу ли я получить полные расширения 3) Если я не понимаю, дайте мне решение.

Примечание: я добавил следующие операторы в build.sbt

libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "7.5.2" ,
    libraryDependencies += "org.elasticsearch" % "elasticsearch" % "7.5.2",

2 ответа

Коннектор Flink Elasticsearch 7

Пожалуйста, посмотрите рабочий и подробный ответ, который я предоставил здесь, который написан на Scala.

Коннекторы потоковой передачи не являются частью двоичного дистрибутива flink. Вы должны упаковать их в свое приложение.

За elasticsearch6 вам нужно добавить flink-connector-elasticsearch6_2.11, что вы можете сделать как

libraryDependencies += "org.apache.flink" %% "flink-connector-elasticsearch6" % "1.6.0"

Как только этот jar станет частью вашей сборки, компилятор найдет недостающие компоненты. Однако я не знаю, будет ли этот клиент ES6 работать с версией 7.5.2.

Другие вопросы по тегам