关于Spark Streaming的快速问题。
我将KafkaUtils中的DirectStream初始化为stream,并将其保存为spark-streaming中的InputDStream,如下所示。
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
context,
PreferConsistent,
Subscribe[String, String](topicList, KafkaParameters)
)
stream
.foreachRDD { rdd =>
println("Executing within rdd")
val rddSize = rdd.count()
if (rddSize > 0) { println(s"Received data $rddSize") }
else {println("Not received any data")}
}
context.start()
我可以看到近50分钟的生产数据的输出。50分钟后,我可以在日志中看到以下消息,
Seeking to LATEST offset of partition topic_name-partition_number
Resetting offset for partition topic_name-partition_number to offset 12908.
但是没有更多的日志说“在rdd内执行”或“接收到的数据$rddSize”或“没有接收到任何数据”
当我启动消费者时,整个逻辑工作得很好,但过了一段时间,它就停止工作了。你知道这是怎么回事吗
1条答案
按热度按时间eoigrqb61#
您没有显示
KafkaParameters
,但任何Kafka消费者都默认设置auto.offset.reset=latest
,这意味着没有任何可消费的偏移量。正如你的日志所说,消费者正在重新设置并再次寻找主题的结尾。您需要一个正在运行的生产者来查看该配置的任何数据此外,Spark Streaming(这不是Kafka Steams)已被弃用,Spark结构化流的所有功能都类似地工作,至少对于Kafka来说是这样。