我是scala/spark流媒体和stackoverflow的新手,请原谅我的格式化。我制作了一个scala应用程序,可以从kafka流中读取日志文件。它在ide中运行得很好,但是如果我能使用 spark-submit
. 它总是失败:
ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
异常中引用的行是此代码段中的load命令:
val records = spark
.readStream
.format("kafka") // <-- use KafkaSource
.option("subscribe", kafkaTopic)
.option("kafka.bootstrap.servers", kafkaBroker) // 192.168.4.86:9092
.load()
.selectExpr("CAST(value AS STRING) AS temp")
.withColumn("record", deSerUDF($"temp"))
ide:智能
Spark:2.2.1
斯卡拉:2.11.8
Kafka:Kafka2.11-0.10.0.0
相关部分 pom.xml
:
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.github.scala-incubator.io</groupId>
<artifactId>scala-io-file_2.11</artifactId>
<version>0.4.3-1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
<!-- version>2.0.0</version -->
</dependency>
注:我不认为这是相关的,但我必须使用 zip -d BroLogSpark.jar "META-INF/*.SF"
以及 zip -d BroLogSpark.jar "META-INF/*.DSA"
为了弄清清单签名的意义。
我的jar文件不包括 org.apache.kafka
. 我看到过好几篇文章强烈地暗示我的版本不匹配,我尝试过无数次的改变 pom.xml
以及 spark-submit
. 每次更改之后,我确认它仍然在ide中运行,然后继续尝试使用 spark-submit
在同一个系统上,同一个用户。下面是我最近的一次尝试 BroLogSpark.jar
在当前目录中,并且“192.168.4.86:9092 profile”是输入参数。
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.1,org.apache.kafka:kafka-clients:0.10.0.0 BroLogSpark.jar 192.168.4.86:9092 BroFile
1条答案
按热度按时间ljo96ir51#
也添加以下依赖项