dse spark流媒体+kafka nosuchmethoderror

6rvt4ljy  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(427)

我试图提交一个Spark流+Kafka的工作,只是从Kafka的主题阅读字符串行。然而,我得到以下例外
15/07/24 22:39:45错误tasksetmanager:阶段2.0中的任务0失败4次;中止线程“thread-49”org.apache.spark.sparkeexception中的作业异常:由于阶段失败而中止作业:阶段2.0中的任务0失败了4次,最近的失败:阶段2.0中的任务0.3丢失(tid 73,10.11.112.93):java.lang.nosuchmethodexception:kafka.serializer.stringdecoder.(kafka.utils.verifiableproperties)java.lang.class.getconstructor0(类。java:2892)java.lang.class.getconstructor(类。java:1723)org.apache.spark.streaming.kafka.kafkareceiver.onstart(kafkainputdstream。scala:106) org.apache.spark.streaming.receiver.receiversupervisor.startreceiver(receiversupervisor。scala:121)org.apache.spark.streaming.receiver.receiversupervisor.start(receiversupervisor。scala:106)org.apache.spark.streaming.scheduler.receivertracker$receiverlauncher$$anonfun$9.apply(receivertracker。scala:264) org.apache.spark.streaming.scheduler.receivertracker$receiverlauncher$$anonfun$9.apply(receivertracker。scala:257)org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext。scala:1121)org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext。scala:1121)org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:62) org.apache.spark.scheduler.task.run(任务。scala:54)org.apache.spark.executor.executor$taskrunner.run(executor。scala:177)java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)java.lang.thread.run(线程。java:745)
当我检查dse使用的sparkjar文件时,我看到它使用了kafka_2.10-0.8.0.jar,而kafka_2.10-0.8.0.jar确实有这个构造函数。不确定是什么导致了错误。这是我的消费代码

val sc = new SparkContext(sparkConf)
    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

    val topicMap = kafkaTopics.split(",").map((_, numThreads.toInt)).toMap
    val accessLogsStream = KafkaUtils.createStream(streamingContext, zooKeeper, "AccessLogsKafkaAnalyzer", topicMap)

    val accessLogs = accessLogsStream.map(_._2).map(log => ApacheAccessLog.parseLogLine(log).cache()

更新此异常似乎只发生在我提交作业时。如果我使用sparkshell通过粘贴代码来运行作业,它就可以正常工作

8iwquhpp

8iwquhpp1#

我的自定义解码器也面临同样的问题。我添加了以下构造函数,解决了这个问题。

public YourDecoder(VerifiableProperties verifiableProperties)
{

}

相关问题