spark流kafka:classnotfoundexception在使用spark submit运行bytearraydeserializer时

jjjwad0x  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(377)

我是scala/spark流媒体和stackoverflow的新手,请原谅我的格式化。我制作了一个scala应用程序,可以从kafka流中读取日志文件。它在ide中运行得很好,但是如果我能使用 spark-submit . 它总是失败:

ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer

异常中引用的行是此代码段中的load命令:

val records = spark
  .readStream
  .format("kafka") // <-- use KafkaSource
  .option("subscribe", kafkaTopic)
  .option("kafka.bootstrap.servers", kafkaBroker) // 192.168.4.86:9092
  .load()
  .selectExpr("CAST(value AS STRING) AS temp")
  .withColumn("record", deSerUDF($"temp"))

ide:智能
Spark:2.2.1
斯卡拉:2.11.8
Kafka:Kafka2.11-0.10.0.0
相关部分 pom.xml :

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spark.version>2.2.1</spark.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>com.github.scala-incubator.io</groupId>
        <artifactId>scala-io-file_2.11</artifactId>
        <version>0.4.3-1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
        <!-- version>2.0.0</version -->
    </dependency>

注:我不认为这是相关的,但我必须使用 zip -d BroLogSpark.jar "META-INF/*.SF" 以及 zip -d BroLogSpark.jar "META-INF/*.DSA" 为了弄清清单签名的意义。
我的jar文件不包括 org.apache.kafka . 我看到过好几篇文章强烈地暗示我的版本不匹配,我尝试过无数次的改变 pom.xml 以及 spark-submit . 每次更改之后,我确认它仍然在ide中运行,然后继续尝试使用 spark-submit 在同一个系统上,同一个用户。下面是我最近的一次尝试 BroLogSpark.jar 在当前目录中,并且“192.168.4.86:9092 profile”是输入参数。

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.1,org.apache.kafka:kafka-clients:0.10.0.0 BroLogSpark.jar 192.168.4.86:9092 BroFile
ljo96ir5

ljo96ir51#

也添加以下依赖项

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.0.0</version>
</dependency>

相关问题