为什么执行结构化流应用程序失败,并出现“failed to find data source:kafka”?

anhgbhbe  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(1170)

这个问题在这里已经有答案了

为什么format(“kafka”)会因为“failed to find data source:kafka.”(即使是uber jar)而失败(6个答案)
两年前关门了。
我尝试将spark结构化流媒体与kafka连接起来,它抛出以下错误:
线程“main”java.lang.classnotfoundexception中出现异常:找不到数据源:kafka。请在…找到包裹。。。

根据文档,我添加了所需的依赖项
我的Kafka和zookeeper服务器正在运行。不知道是什么问题。而且,我是这样用的

import spark.implicits._
val feedback =spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:2181").option("subscribe", "kafka_input_topic")
      .load().as[InputMessage].filter(_.lang.equals("en"))

感谢您的帮助。谢谢您

uelo1irk

uelo1irk1#

问题是运行时(不是构建时)类路径中没有包含必需的jar。
根据链接到的文档,将所需的依赖项添加到生成定义文件中( pom.xml 或者 build.sbt 或者 build.gradle ),但是在您尝试运行应用程序时会发生异常,而该应用程序是在生成之后运行的,不是吗?
您缺少的是关于部署的文档部分,即部署:
与任何spark应用程序一样,spark submit用于启动应用程序。spark-sql-kafka-0-10_.11及其依赖项可以直接添加到spark submit中,使用--packages,例如,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ..

你必须加上这个 --packages 或者您必须创建一个uberjar,使依赖项成为jar文件的一部分。

vd2z7a6w

vd2z7a6w2#

正如你在评论中提到的,问题是:

<scope>provided</scope>

拆下 provided SQLKafka的作用域,因为spark安装没有提供它。

fwzugrvs

fwzugrvs3#

如果使用maven,那么下面用依赖关系构建jar的方法可能会解决您的问题。
添加如下spark依赖项:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.1</version>
        <scope>${spark.scope}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>

然后按以下方式配置maven配置文件:

<profiles>
    <profile>
        <id>default</id>
        <properties>
            <profile.id>dev</profile.id>
            <spark.scope>compile</spark.scope>
        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
    </profile>
    <profile>
        <id>test</id>
        <properties>
            <profile.id>test</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
    <profile>
        <id>online</id>
        <properties>
            <profile.id>online</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
</profiles>

添加以下插件:

<plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

然后把你的jar用 mvn clean install -Ponline -DskipTests . 这应该能解决你的问题

ctzwtxfj

ctzwtxfj4#

您可以按完全限定名(而不是别名)使用kafka数据源,如下所示:

spark.readStream.format("org.apache.spark.sql.kafka010.KafkaSourceProvider").load

相关问题