我正在使用Spark 3.3.0和Kafka 3.2.0。我想在readStream中使用组ID,但当spark不运行时数据丢失。当spark停止时,我如何停止数据流?
我的代码;
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", x.x.x.x)
.option("subscribe", "topic-name")
.option("kafka.group.id","group_name")
.option("enable.auto.commit",true)
.load()
1条答案
按热度按时间1l5u6lss1#
您需要更改您的Kafka配置:
log.retention.hours=100
这使得Kafka可以将数据保留100小时,并且在spark不运行时不会丢失数据