java执行选项,同时解析kafka流中的json rdd

unftdfkk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(357)

我正在尝试使用spark流库从kafka读取json字符串。代码能够连接到kafka代理,但在解码消息时失败。代码的灵感来自
https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/kafkastreamingjson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
         StringDecoder](ssc, kParams, kTopic).map(_._2)
  println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>

   if (rdd.toLocalIterator.nonEmpty) {

          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
           println(rdd.name)
        }

java.lang.nosuchmethoderror:kafka.message.messageandmetadata.(ljava/lang/string;伊尔Kafka/message/message;jlkafka/串行器/解码器;lkafka/串行器/解码器;)v位于org.apache.spark.streaming.kafka.kafkardd$kafkardditerator.getnext(kafkardd。scala:222)在org.apache.spark.util.nextiterator.hasnext(nextiterator。scala:73)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:327)在scala.collection.iterator$class.foreach(iterator。scala:727)在scala.collection.abstractiterator.foreach(迭代器。scala:1157)在scala.collection.generic.growable$class.$plus$plus$eq(可增长)。scala:48)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:103)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:47)在scala.collection.traversableonce$class.to(traversableonce。scala:273)在scala.collection.abstractiterator.to(迭代器。scala:1157)在scala.collection.traversableonce$class.tobuffer(traversableonce。scala:265)

k10s72fa

k10s72fa1#

问题是用Kafkajars的版本,用0.9.0.0修复了问题。类kafka.message.messageandmetadata是在0.8.2.0中引入的。

相关问题