我正在尝试使用spark流库从kafka读取json字符串。代码能够连接到kafka代理,但在解码消息时失败。代码的灵感来自
https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/kafkastreamingjson.scala
val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kParams, kTopic).map(_._2)
println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>
if (rdd.toLocalIterator.nonEmpty) {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.read.json(rdd).registerTempTable("mytable")
if (firstTime) {
sqlContext.sql("SELECT * FROM mytable").printSchema()
}
val df = sqlContext.sql(selectStr)
df.collect.foreach(println)
df.rdd.saveAsTextFile(fileName)
mergeFiles(fileName, firstTime)
firstTime = false
println(rdd.name)
}
java.lang.nosuchmethoderror:kafka.message.messageandmetadata.(ljava/lang/string;伊尔Kafka/message/message;jlkafka/串行器/解码器;lkafka/串行器/解码器;)v位于org.apache.spark.streaming.kafka.kafkardd$kafkardditerator.getnext(kafkardd。scala:222)在org.apache.spark.util.nextiterator.hasnext(nextiterator。scala:73)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:327)在scala.collection.iterator$class.foreach(iterator。scala:727)在scala.collection.abstractiterator.foreach(迭代器。scala:1157)在scala.collection.generic.growable$class.$plus$plus$eq(可增长)。scala:48)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:103)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:47)在scala.collection.traversableonce$class.to(traversableonce。scala:273)在scala.collection.abstractiterator.to(迭代器。scala:1157)在scala.collection.traversableonce$class.tobuffer(traversableonce。scala:265)
1条答案
按热度按时间k10s72fa1#
问题是用Kafkajars的版本,用0.9.0.0修复了问题。类kafka.message.messageandmetadata是在0.8.2.0中引入的。