来自Kafka集团的Spark readStream

x4shl7ld  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(132)

我正在使用Spark 3.3.0和Kafka 3.2.0。我想在readStream中使用组ID,但当spark不运行时数据丢失。当spark停止时,我如何停止数据流?

我的代码;

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", x.x.x.x)
  .option("subscribe", "topic-name")
  .option("kafka.group.id","group_name")
  .option("enable.auto.commit",true)
  .load()
1l5u6lss

1l5u6lss1#

您需要更改您的Kafka配置:
log.retention.hours=100
这使得Kafka可以将数据保留100小时,并且在spark不运行时不会丢失数据

相关问题