apache bahir for mqtt streaming java.lang.noclassdeffounderror:org/apache/spark/sql/connector/catalog/tableprovider

ia2d9nvy  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(982)

我正试图利用apachebahir创建一个简单的spark程序来读取数据流中的mqtt流。然而,我似乎无法完成。
我的sbt文件如下所示:

name := "spark-practice"

version := "0.1"

scalaVersion := "2.12.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.4.0"

libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0"

libraryDependencies += "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.2.1"

当我执行bahir示例中的以下代码行时:

val spark = SparkSession.builder()
  .master("local[*]")
  .appName("Bahir mqtt example")
  .getOrCreate()

import spark.implicits._
val brokerUrl = "tcp://localhost:1883"
val topic = "test"

val powerMetrics = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic)
  .load(brokerUrl)
  .selectExpr("CAST(id AS INT)", "CAST(topic AS STRING)", "CAST(payload AS STRING)", "timestamp as timestamp")
  .as[(Int, String, String, Timestamp)]

val query = powerMetrics.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination();

我得到以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:302)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:296)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:394)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:394)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
    at BahirMqttPractice$.main(BahirMqttPractice.scala:18)
    at BahirMqttPractice.main(BahirMqttPractice.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connector.catalog.TableProvider
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 35 more

我相信这是每个项目的版本造成的问题。mosquitto已经安装在我的系统上,运行良好。知道会发生什么吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题