spark-streaming-kafka-10dsteam没有从kafka那里得到任何东西

pcww981p  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(321)

我正在尝试使用spark-streaming-kafka-0.10从kafka主题(代理版本0.10)中提取消息。我已经检查了消息是否正在生成,并使用kafkaconsumer成功地拉取它们。现在,当我尝试使用spark流api时,我什么都没有得到。如果我只使用kafkautils.createrdd并手动指定一些偏移范围,它就可以工作了。但是,当我尝试使用createdirectstream时,所有rdd都是空的,当我检查分区偏移量时,它只是报告所有分区都是0。以下是我尝试的:

val sparkConf = new SparkConf().setAppName("kafkastream")
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 val topics = Array("my_topic")

 val kafkaParams = Map[String, Object](
   "bootstrap.servers" -> "hostname:6667"
   "key.deserializer" -> classOf[StringDeserializer],
   "value.deserializer" -> classOf[StringDeserializer],
   "group.id" -> "my_group",
   "auto.offset.reset" -> "earliest",
   "enable.auto.commit" -> (true: java.lang.Boolean)
 )

 val stream = KafkaUtils.createDirectStream[String, String](
   ssc,
   PreferConsistent,
   Subscribe[String, String](topics, kafkaParams)
 )

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd.foreachPartition { iter =>
     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }

   val rddCount = rdd.count()
   println("rdd count: ", rddCount)

   // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }

 ssc.start()
 ssc.awaitTermination()

所有分区都显示从0到0的偏移量范围,并且所有RDD都是空的。我希望它从一个分区的开始开始,但同时也要收集生成到它的所有内容。

dgenwo3n

dgenwo3n1#

我发现这是Kafka的客户问题。我使用的是kafka客户端0.10.1.1,它有一个bug,在这里修复了:https://issues.apache.org/jira/browse/kafka-4547
因此,我的补丁更新为0.10.2.1。spark streaming kafka使用的是0.10.0.1的编译依赖项,但当我包含自己的版本时,我正在覆盖它。

相关问题