KafkaSpark连接时出错

bnl4lu3b  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(343)

我试着把Kafka和斯帕克联系起来。我使用Kafka2.11-0.11.0.1和spark 2.2.0。我将jar文件包括为:
Kafka2.11-0.11.0.1Kafka客户端-0.11.0.1 spark-streamingKafka2.11-2.2.0 spark-streamingKafka2.11-2.2.0
这是我的密码:

import org.apache.spark._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaExample {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val kafkaParams = Map("metadata.broker.list" -> "kafkaIP:9092")

    val topics = Set("logstash_log")

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)

    stream.print()

    ssc.checkpoint("C:/checkpoint/")
    ssc.start()
    ssc.awaitTermination()
  }
}

我得到了这个答复,但在任何地方都找不到解决方案:

Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.TopicMetadata.errorCode()S
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1$$anonfun$4.apply(KafkaCluster.scala:127)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1$$anonfun$4.apply(KafkaCluster.scala:127)
    at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:127)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at com.defne.KafkaExample$.main(KafkaExample.scala:27)
    at com.defne.KafkaExample.main(KafkaExample.scala)

为什么会发生这种情况?我怎么处理?任何帮助都将不胜感激!
谢谢。

lawou6xi

lawou6xi1#

可能会有帮助。您可以根据数据集和ip地址进行修改

def StreamingFromKafkaMain(): Unit =
      {

        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.34.216:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )

        val topics = Array("myTopicName")

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaTest")
        val streamingContext = new StreamingContext(sparkConf, Seconds(1))
        // Create a input direct stream
        val kafkaStream = KafkaUtils.createDirectStream[String, String](

      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val spark = SparkSession.builder().master("local[*]").appName("KafkaTest").getOrCreate()
    val items = kafkaStream.map(record => (record.key, record.value.split("\n")))
    val itemStatus = items.map(status => status.toString())

    items.foreachRDD(
      rddm => if (!rddm.isEmpty()) {

        //val my_dataset=StreamingFromKafkaOracleMain();
        println("Test")
        //my_dataset.show
        //val df1 = rddm.map(_.mkString(",")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()

        val splittedRdd =rddm.map(line=>line.toString().split(","))
        println(splittedRdd.take(10))

      }
    )

    streamingContext.start()
    streamingContext.awaitTermination()

  }

相关问题