Apache Bahir для потоковой передачи MQTT java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider

Я пытаюсь использовать Apache Bahir для создания простой программы Spark, которая читает поток mqtt в DStream. Однако я не могу этого сделать.

Мой файл sbt выглядит так:

name := "spark-practice"

version := "0.1"

scalaVersion := "2.12.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.4.0"

libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0"

libraryDependencies += "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.2.1"

Когда я выполняю следующие строки кода из примера Bahir:

val spark = SparkSession.builder()
  .master("local[*]")
  .appName("Bahir mqtt example")
  .getOrCreate()

import spark.implicits._
val brokerUrl = "tcp://localhost:1883"
val topic = "test"

val powerMetrics = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic)
  .load(brokerUrl)
  .selectExpr("CAST(id AS INT)", "CAST(topic AS STRING)", "CAST(payload AS STRING)", "timestamp as timestamp")
  .as[(Int, String, String, Timestamp)]

val query = powerMetrics.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination();

Я получаю следующую ошибку:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:302)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:296)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:394)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:394)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
    at BahirMqttPractice$.main(BahirMqttPractice.scala:18)
    at BahirMqttPractice.main(BahirMqttPractice.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connector.catalog.TableProvider
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 35 more

Я считаю, что это проблема, вызванная версиями каждого проекта. Mosquitto установлен в моей системе и работает нормально. Есть идеи, что может случиться?

0 ответов

Другие вопросы по тегам