Не удалось найти источник данных: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider

Версия spark - 2.3.3. Я хочу подключиться к mqtt, используя аргумент --packages в spark-shell.

bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2

scala> import java.sql.Timestamp
scala> import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder.appName("MQTTStreamWordCount").getOrCreate()
scala> val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "mytopic").load("tcp://localhost:1883")

Тем не менее, ошибка все еще происходит.

java.lang.ClassNotFoundException: Failed to find data source: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:622)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:622)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:622)
  ... 51 more

Я не знаю, в чем проблема. Я хочу использовать потоковую передачу с использованием mqtt, есть ли другой способ сделать это с помощью bahir?

Спасибо,

0 ответов

Другие вопросы по тегам