通过PySpark在Kafka-Spark结构化流集成中遇到NoClassDefFoundError错误

gzjq41n4  于 2022-11-16  发布在  Apache
关注(0)|答案(3)|浏览(152)

我在用

  • Spark版本:3.0.0-预览2
  • Scala版本:二月一二日
  • JAVA版本:1.8
  • Kafka经纪人版本:第2.2.0版

我已经配置了两个JARS(spark-sql-Kafka-0-10_2.12-3.0.0-preview2.jarkafka-clients-2.2.0.jar),并将JARS保存在$SPARK_HOME/jars文件夹中。当我尝试查看来自Kafka服务器的数据的键、值时(因为Kafka的数据以JSON格式的K-V对出现),我遇到了以下错误

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more
Exception in thread "stream execution thread for [id = 504665ad-c59a-4a85-8c46-4d6c741b0adf, runId = 36bc5028-6b34-4d6c-a265-4c38ce66cfcbError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more

下面是我尝试查看Kafka数据的Key-Value对的代码

from pyspark import *
from pyspark.sql import *
from pyspark.sql.utils import *
from pyspark.streaming import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092") \
  .option("subscribe", "topic1,topic2,topic3") \
  .option("failOnDataLoss", "false") \
  .load()

table = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

query = table \
        .writeStream \
        .outputMode("append") \
        .option("truncate","false") \
        .format("console") \
        .start() \
        .awaitTermination()

有人能帮我解决这个错误吗?提前感谢!

j2cgzkjk

j2cgzkjk1#

您需要在$SPARK_HOME/JARS中添加此jar

  1. commons-pool2-2.8.0.jar
  2. kafka-clients-2.5.0.jar
  3. spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar
  4. spark-token-provider-kafka-0-10_2.12-3.0.0-preview2.jar
abithluo

abithluo2#

我已经解决了这个问题,方法是在**$SPARK_HOME/JARS文件夹和 spark-defaults.conf 文件中添加spark-streaming-Kafka-0-10-assembly_2.12-3.0.0-preview2.jar**作为

spark.driver.extraClassPath       $SPARK_HOME/jars/*.jar

spark.executor.extraClassPath     $SPARK_HOME/jars/*.jar

并运行spark-submit命令,如下所示:

$SPARK_HOME/bin/spark-submit --master yarn --jars $SPARK_HOME/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,$SPARK_HOME/jars/kafka-clients-2.2.0.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar test.py
ibrsph3r

ibrsph3r3#

这是我如何配置运行PySpark(版本与scala 2.12 Spark 3.2.1)结构流与Kafka在jupyter实验室
首先,我下载了5个jar文件,并将它们放在我当前项目文件夹下的文件夹/jars中(我认为只是用于本地运行):

  • spark-sql-kafka-0-10_2.12-3.2.1.jar
  • kafka-clients-2.1.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar
  • commons-pool2-2.8.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.1.jar

jars的配置值如下所示:”
下面是实际代码:
“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“”“
在这个例子中,我们使用了一个简单的方法来创建一个新的文件夹,这个文件夹包含了一个新的文件夹。
“/jars/spark-令牌提供程序-kafka-0-10_2.12-3.2.1.jar”))的方法来实现的。
在这个例子中,我们使用了一个新的方法来创建一个新的脚本。
设置(“Spark.sql.shuffle.分区”,1

相关问题