我有一个结构化的流媒体作业,其中有Kafka作为源和三角洲作为接收器。每批将在一个foreachBatch
内处理。
我面临的问题是我需要将这个结构化流配置为只触发一次,但在初始运行中Kafka总是不返回任何记录。
以下是我配置结构化流处理的方式:
var kafka_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_config)
.option("subscribe", kafka_topic)
.option("startingOffsets", "latest")
.option("groupid", my_group_id)
.option("minOffsetsPerTrigger", "20")
.load()
val kafka_stream_payload = kafka_stream.selectExpr("cast (value as string) as msg ")
kafka_stream_payload
.writeStream
.format( "console" )
.queryName( "my_query" )
.outputMode( "append" )
.foreachBatch { (batchDF: DataFrame, batchId: Long) => process_micro_batch( batchDF ) }
.trigger(Trigger.AvailableNow())
.start()
.awaitTermination()
我试图通过使用"minOffsetsPerTrigger", "20"
来配置Kafka readStream
,使其最少选择20条新消息,但是,每次第一次迭代都返回0条新消息。
如果我删除了.trigger(Trigger.AvailableNow())
选项,在第二次(及以后)迭代过程中,该进程将平均阅读200条新Kafka消息。
我在第一次迭代中得到0条记录是不是有什么原因?我如何配置sourceStream
来强制执行最小数量的新消息?
1条答案
按热度按时间wkyowqbh1#
由于您配置了(.option(“startingOffsets”,“latest”)),因此如果在Kafka主题中无法及时获得消息,则您可能会在第一次迭代中获得0条消息。请尝试检查(.option(“startingOffsets”,“, earliest*”))。
或加(“自动偏移复位”,“最早”)
或者确保数据持续发布到Kafka主题,然后启动消费者