我正在尝试使用spark-streaming-kafka-0.10从kafka主题(代理版本0.10)中提取消息。我已经检查了消息是否正在生成,并使用kafkaconsumer成功地拉取它们。现在,当我尝试使用spark流api时,我什么都没有得到。如果我只使用kafkautils.createrdd并手动指定一些偏移范围,它就可以工作了。但是,当我尝试使用createdirectstream时,所有rdd都是空的,当我检查分区偏移量时,它只是报告所有分区都是0。以下是我尝试的:
val sparkConf = new SparkConf().setAppName("kafkastream")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val topics = Array("my_topic")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hostname:6667"
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my_group",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
val rddCount = rdd.count()
println("rdd count: ", rddCount)
// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
所有分区都显示从0到0的偏移量范围,并且所有RDD都是空的。我希望它从一个分区的开始开始,但同时也要收集生成到它的所有内容。
1条答案
按热度按时间dgenwo3n1#
我发现这是Kafka的客户问题。我使用的是kafka客户端0.10.1.1,它有一个bug,在这里修复了:https://issues.apache.org/jira/browse/kafka-4547
因此,我的补丁更新为0.10.2.1。spark streaming kafka使用的是0.10.0.1的编译依赖项,但当我包含自己的版本时,我正在覆盖它。