这个问题在这里已经有答案了:
为什么format(“kafka”)会因为“failed to find data source:kafka.”(即使是uber jar)而失败(6个答案)
两年前关门了。
我尝试将spark结构化流媒体与kafka连接起来,它抛出以下错误:
线程“main”java.lang.classnotfoundexception中出现异常:找不到数据源:kafka。请在…找到包裹。。。
根据文档,我添加了所需的依赖项
我的Kafka和zookeeper服务器正在运行。不知道是什么问题。而且,我是这样用的
import spark.implicits._
val feedback =spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:2181").option("subscribe", "kafka_input_topic")
.load().as[InputMessage].filter(_.lang.equals("en"))
感谢您的帮助。谢谢您
4条答案
按热度按时间uelo1irk1#
问题是运行时(不是构建时)类路径中没有包含必需的jar。
根据链接到的文档,将所需的依赖项添加到生成定义文件中(
pom.xml
或者build.sbt
或者build.gradle
),但是在您尝试运行应用程序时会发生异常,而该应用程序是在生成之后运行的,不是吗?您缺少的是关于部署的文档部分,即部署:
与任何spark应用程序一样,spark submit用于启动应用程序。spark-sql-kafka-0-10_.11及其依赖项可以直接添加到spark submit中,使用--packages,例如,
你必须加上这个
--packages
或者您必须创建一个uberjar,使依赖项成为jar文件的一部分。vd2z7a6w2#
正如你在评论中提到的,问题是:
拆下
provided
SQLKafka的作用域,因为spark安装没有提供它。fwzugrvs3#
如果使用maven,那么下面用依赖关系构建jar的方法可能会解决您的问题。
添加如下spark依赖项:
然后按以下方式配置maven配置文件:
添加以下插件:
然后把你的jar用
mvn clean install -Ponline -DskipTests
. 这应该能解决你的问题ctzwtxfj4#
您可以按完全限定名(而不是别名)使用kafka数据源,如下所示: