Kafka作为读流源在第一次迭代中总是返回0条消息

pb3s4cty  于 2023-01-12  发布在  Apache
关注(0)|答案(1)|浏览(129)

我有一个结构化的流媒体作业,其中有Kafka作为源和三角洲作为接收器。每批将在一个foreachBatch内处理。
我面临的问题是我需要将这个结构化流配置为只触发一次,但在初始运行中Kafka总是不返回任何记录。
以下是我配置结构化流处理的方式:

var kafka_stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafka_bootstrap_config)
      .option("subscribe", kafka_topic)
      .option("startingOffsets", "latest")
      .option("groupid", my_group_id)
      .option("minOffsetsPerTrigger", "20")
      .load()

val kafka_stream_payload = kafka_stream.selectExpr("cast (value as string) as msg ")

kafka_stream_payload
    .writeStream
    .format( "console" )
    .queryName( "my_query" )
    .outputMode( "append" )
    .foreachBatch { (batchDF: DataFrame, batchId: Long) => process_micro_batch( batchDF ) }
    .trigger(Trigger.AvailableNow())
    .start()
    .awaitTermination()

我试图通过使用"minOffsetsPerTrigger", "20"来配置Kafka readStream,使其最少选择20条新消息,但是,每次第一次迭代都返回0条新消息。
如果我删除了.trigger(Trigger.AvailableNow())选项,在第二次(及以后)迭代过程中,该进程将平均阅读200条新Kafka消息。
我在第一次迭代中得到0条记录是不是有什么原因?我如何配置sourceStream来强制执行最小数量的新消息?

wkyowqbh

wkyowqbh1#

由于您配置了(.option(“startingOffsets”,“latest”)),因此如果在Kafka主题中无法及时获得消息,则您可能会在第一次迭代中获得0条消息。请尝试检查(.option(“startingOffsets”,“, earliest*”))。
或加(“自动偏移复位”,“最早”)
或者确保数据持续发布到Kafka主题,然后启动消费者

相关问题