我正试图利用apachebahir创建一个简单的spark程序来读取数据流中的mqtt流。然而,我似乎无法完成。
我的sbt文件如下所示:
name := "spark-practice"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.4.0"
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0"
libraryDependencies += "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.2.1"
当我执行bahir示例中的以下代码行时:
val spark = SparkSession.builder()
.master("local[*]")
.appName("Bahir mqtt example")
.getOrCreate()
import spark.implicits._
val brokerUrl = "tcp://localhost:1883"
val topic = "test"
val powerMetrics = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl)
.selectExpr("CAST(id AS INT)", "CAST(topic AS STRING)", "CAST(payload AS STRING)", "timestamp as timestamp")
.as[(Int, String, String, Timestamp)]
val query = powerMetrics.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination();
我得到以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:302)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:296)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:394)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:394)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
at BahirMqttPractice$.main(BahirMqttPractice.scala:18)
at BahirMqttPractice.main(BahirMqttPractice.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.connector.catalog.TableProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 35 more
我相信这是每个项目的版本造成的问题。mosquitto已经安装在我的系统上,运行良好。知道会发生什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!