无流时触发结构化流窗口

fkvaft9z  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(344)

我想记录从spark结构化流传入流读取到数据库的记录数。我使用foreachbatch来转换传入的流批处理,并将其写入所需的位置。如果某个小时内没有记录,我想记录读取的0条记录。但是foreach批处理在没有流时不执行。有人能帮我吗?我的代码如下: val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load() ```
val query=incomingStream.writeStream.foreachBatch{
(batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
.option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
.trigger(Trigger.ProcessingTime(triggerTime.toLong))
.start().awaitTermination()

cyej8jka

cyej8jka1#

这就是它的工作原理,即使是mods,streamingquerylistener的扩展也只有在有东西需要处理时才被调用,因此流的状态发生了变化。
可能还有另一种方法,但我会说“跳出框框思考”和每个时间段0的pre-popualte这样的数据库,当查询聚合时,您将得到正确的答案。
https://medium.com/@johankok/structured-streaming-in-a-flash-576cdb17bbee可以提供一些见解和Spark:最终指南。

相关问题