我在用
- Spark版本:3.0.0-预览2
- Scala版本:二月一二日
- JAVA版本:1.8
- Kafka经纪人版本:第2.2.0版
我已经配置了两个JARS(spark-sql-Kafka-0-10_2.12-3.0.0-preview2.jar和kafka-clients-2.2.0.jar),并将JARS保存在$SPARK_HOME/jars文件夹中。当我尝试查看来自Kafka服务器的数据的键、值时(因为Kafka的数据以JSON格式的K-V对出现),我遇到了以下错误
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 30 more
Exception in thread "stream execution thread for [id = 504665ad-c59a-4a85-8c46-4d6c741b0adf, runId = 36bc5028-6b34-4d6c-a265-4c38ce66cfcbError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 30 more
下面是我尝试查看Kafka数据的Key-Value对的代码
from pyspark import *
from pyspark.sql import *
from pyspark.sql.utils import *
from pyspark.streaming import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
spark = SparkSession \
.builder \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092") \
.option("subscribe", "topic1,topic2,topic3") \
.option("failOnDataLoss", "false") \
.load()
table = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = table \
.writeStream \
.outputMode("append") \
.option("truncate","false") \
.format("console") \
.start() \
.awaitTermination()
有人能帮我解决这个错误吗?提前感谢!
3条答案
按热度按时间j2cgzkjk1#
您需要在$SPARK_HOME/JARS中添加此jar
abithluo2#
我已经解决了这个问题,方法是在**$SPARK_HOME/JARS文件夹和 spark-defaults.conf 文件中添加spark-streaming-Kafka-0-10-assembly_2.12-3.0.0-preview2.jar**作为
并运行spark-submit命令,如下所示:
ibrsph3r3#
这是我如何配置运行PySpark(版本与scala 2.12 Spark 3.2.1)结构流与Kafka在jupyter实验室
首先,我下载了5个jar文件,并将它们放在我当前项目文件夹下的文件夹/jars中(我认为只是用于本地运行):
jars的配置值如下所示:”
下面是实际代码:
“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“
在这个例子中,我们使用了一个简单的方法来创建一个新的文件夹,这个文件夹包含了一个新的文件夹。
“/jars/spark-令牌提供程序-kafka-0-10_2.12-3.2.1.jar”))的方法来实现的。
在这个例子中,我们使用了一个新的方法来创建一个新的脚本。
设置(“Spark.sql.shuffle.分区”,1